diff --git a/Cargo.lock b/Cargo.lock index b27c5d2b..05e20b99 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4086,7 +4086,9 @@ name = "harmony-fleet-deploy" version = "0.1.0" dependencies = [ "anyhow", + "async-nats", "async-trait", + "chrono", "clap", "env_logger", "harmony", diff --git a/fleet/harmony-fleet-deploy/Cargo.toml b/fleet/harmony-fleet-deploy/Cargo.toml index da44b0c6..d819f887 100644 --- a/fleet/harmony-fleet-deploy/Cargo.toml +++ b/fleet/harmony-fleet-deploy/Cargo.toml @@ -25,7 +25,9 @@ harmony-fleet-auth = { path = "../harmony-fleet-auth" } harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" } anyhow = { workspace = true } +async-nats = { workspace = true } async-trait = { workspace = true } +chrono = { workspace = true, features = ["serde"] } clap = { workspace = true } k8s-openapi = { workspace = true } kube = { workspace = true, features = ["runtime", "derive"] } diff --git a/fleet/harmony-fleet-deploy/src/companion/logs/chunk.rs b/fleet/harmony-fleet-deploy/src/companion/logs/chunk.rs new file mode 100644 index 00000000..ee6d5210 --- /dev/null +++ b/fleet/harmony-fleet-deploy/src/companion/logs/chunk.rs @@ -0,0 +1,220 @@ +//! The value a [`LogQuery`](super::LogQuery) returns: a chunk of log +//! lines with the metadata the dashboard needs to render it honestly. +//! +//! Kept as a pure data type — no transport, no async, no IO. The +//! transport layer (the `PodmanLogQuery` impl, the agent's command +//! handler) constructs and consumes it; the dashboard renders it. +//! That separation makes the trait easy to mock for unit tests and +//! easy to evolve (e.g. add `level` per line) without churning every +//! call site. + +use chrono::{DateTime, Utc}; +use thiserror::Error; + +/// Maximum byte length of a [`LogSource`] string. Bounded so a +/// misbehaving agent can't OOM the dashboard with a multi-megabyte +/// "source" field. +pub const LOG_SOURCE_MAX_BYTES: usize = 128; + +/// Stable identifier for the log stream a [`LogChunk`] came from +/// (e.g. `podman:my-deployment/web`). +/// +/// **Validated newtype**: the value originates on a NATS reply from +/// an untrusted-until-authenticated agent, so we enforce shape +/// constraints at the deserialization boundary. Rules: +/// +/// - non-empty +/// - no ASCII control characters (`\0`..`\x1f`, `\x7f`) +/// - at most [`LOG_SOURCE_MAX_BYTES`] bytes (UTF-8 byte length) +/// +/// A bad value is surfaced via [`LogQueryError::InvalidReply`], not +/// folded into a generic decode failure — the dashboard logs the +/// rejected string for triage. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct LogSource(String); + +impl LogSource { + pub fn try_new(name: impl Into) -> Result { + let s = name.into(); + if s.is_empty() { + return Err(InvalidLogSource::Empty); + } + if s.len() > LOG_SOURCE_MAX_BYTES { + return Err(InvalidLogSource::TooLong { + bytes: s.len(), + max: LOG_SOURCE_MAX_BYTES, + }); + } + if let Some(c) = s.chars().find(|c| c.is_control()) { + return Err(InvalidLogSource::ControlChar(c)); + } + Ok(LogSource(s)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl std::fmt::Display for LogSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} + +#[derive(Debug, Error, PartialEq, Eq)] +pub enum InvalidLogSource { + #[error("log source name must not be empty")] + Empty, + #[error("log source name is {bytes} bytes; max is {max}")] + TooLong { bytes: usize, max: usize }, + #[error("log source name contains control char {0:?}")] + ControlChar(char), +} + +/// A bounded chunk of log lines fetched from a device. +/// +/// "Chunk" is deliberate: this isn't "the logs" — it's the last N +/// lines, captured at one moment, in their original ordering. The +/// dashboard renders the chunk; refreshing fetches a new chunk. v0.4 +/// will add a streaming variant; for v0.3 the customer's mental +/// model is "show me what just happened", which a chunk answers +/// directly. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LogChunk { + /// Stable identifier for the log stream this chunk came from + /// (e.g. `podman:my-deployment/web`). Validated by [`LogSource`] + /// at deserialization time — see that type's docs for the shape + /// constraints. + pub source: LogSource, + /// When the chunk was captured **on the device**, not when the + /// operator received it. Surfaces clock skew between operator and + /// device — "logs from 12 seconds ago" is honest, "logs as of + /// now" would be a lie. + pub captured_at: DateTime, + /// Oldest-first. Empty when the container has produced no output + /// yet. Each entry is one container log line as the runtime + /// reported it; we do not split / merge / re-encode. + pub lines: Vec, + /// `true` when the runtime had more output than fit in the chunk + /// (either the per-request size budget or the runtime's own tail + /// limit). Surfacing this lets the dashboard show a "showing last + /// N of more" hint instead of silently hiding lines. + pub truncated: bool, +} + +impl LogChunk { + /// `true` when the device has produced no log lines yet for this + /// deployment. Distinct from "the device is offline" (that's a + /// transport error, not an empty chunk) — the dashboard renders + /// the two differently. + pub fn is_empty(&self) -> bool { + self.lines.is_empty() + } + + /// How many lines this chunk carries. + pub fn len(&self) -> usize { + self.lines.len() + } +} + +/// Why a log query failed. +/// +/// Four arms, deliberately mapped to four different operator actions: +/// +/// - [`LogQueryError::DeviceOffline`] — no agent listening. The +/// dashboard shows "device is offline"; retrying immediately is +/// pointless. +/// - [`LogQueryError::Timeout`] — agent didn't reply in time. Could +/// be a slow podman socket, a saturated network, or a hung agent. +/// The dashboard offers a retry button. +/// - [`LogQueryError::Agent`] — the agent replied but reported its +/// own failure (deployment not found, container has no log driver, +/// etc.). The dashboard renders the message verbatim — it's the +/// agent's responsibility to make these actionable. +/// - [`LogQueryError::BadReply`] — agent replied with bytes that +/// couldn't be deserialized. Indicates a version mismatch between +/// operator and agent; the dashboard shows "device protocol +/// mismatch — update the agent". +/// - [`LogQueryError::Transport`] — generic NATS transport failure +/// (publish failed, subscribe failed). Rare; usually a NATS server +/// problem, not a device problem. +/// +/// We mirror the categorization that `FleetCommandsClient::CommandError` +/// uses for `Ping` — both share the same NATS request/reply substrate, +/// so callers benefit from a uniform error shape across verbs. +#[derive(Debug, Error)] +pub enum LogQueryError { + #[error("device offline (no agent listening on the logs subject)")] + DeviceOffline, + #[error("log query timed out after {0:?}")] + Timeout(std::time::Duration), + #[error("agent reported an error: {0}")] + Agent(String), + #[error("agent reply decode failed: {0}")] + BadReply(#[from] serde_json::Error), + #[error("nats transport: {0}")] + Transport(String), + /// The wire payload was syntactically valid but semantically + /// unusable — e.g. the agent returned a `source` string that + /// failed [`LogSource`] validation. We surface this as a + /// distinct arm (rather than folding into `BadReply`) so the + /// dashboard can log the bad value: it's almost always an agent + /// bug. + #[error("invalid reply contents: {0}")] + InvalidReply(String), +} + +#[cfg(test)] +mod tests { + use super::*; + + fn pn(s: &str) -> LogSource { + LogSource::try_new(s).expect("valid") + } + + #[test] + fn empty_chunk_reports_is_empty() { + // "No lines yet" is a legal, common state — the dashboard + // renders it as "no output". A separate state from "device + // offline". + let chunk = LogChunk { + source: pn("podman:hello"), + captured_at: Utc::now(), + lines: vec![], + truncated: false, + }; + assert!(chunk.is_empty()); + assert_eq!(chunk.len(), 0); + } + + #[test] + fn populated_chunk_preserves_line_order() { + // The trait contract is "oldest first". Reordering would + // misrender the dashboard (newest line at top of an + // oldest-first list is the *first* entry). + let chunk = LogChunk { + source: pn("podman:hello"), + captured_at: Utc::now(), + lines: vec!["a".into(), "b".into(), "c".into()], + truncated: false, + }; + assert_eq!(chunk.lines, vec!["a", "b", "c"]); + assert_eq!(chunk.len(), 3); + assert!(!chunk.is_empty()); + } + + #[test] + fn truncated_flag_is_independent_of_emptiness() { + // A chunk can be both truncated and have lines (we cut the + // head) — assert both states coexist. + let chunk = LogChunk { + source: pn("podman:hello"), + captured_at: Utc::now(), + lines: vec!["latest".into()], + truncated: true, + }; + assert!(!chunk.is_empty()); + assert!(chunk.truncated); + } +} diff --git a/fleet/harmony-fleet-deploy/src/companion/logs/mod.rs b/fleet/harmony-fleet-deploy/src/companion/logs/mod.rs new file mode 100644 index 00000000..da89c838 --- /dev/null +++ b/fleet/harmony-fleet-deploy/src/companion/logs/mod.rs @@ -0,0 +1,125 @@ +//! Log-tail contract — the third Score companion (ADR-023 P7). +//! +//! ## The big picture +//! +//! ```text +//! ┌────────────┐ pair ┌─────────────┐ request ┌──────────┐ +//! │ *Score │◀─type─────│ LogQuery │──────────────▶│ Agent │ +//! └────────────┘ locked └─────────────┘ (NATS r/r) └──────────┘ +//! │ │ +//! └─── deserialize reply ◀─────┘ +//! │ +//! ▼ +//! LogChunk +//! ``` +//! +//! - [`LogQuery`] is the companion trait: pair a [`Score`] with a way +//! to fetch the last N lines of *that specific Score's* container +//! logs from the device that's running it. The pairing is type-level +//! via the `Score` associated type — the compiler refuses to call +//! `log_query.last_lines(&podman_score, …)` with a query whose +//! `Score = FleetOperatorScore`. Same trick the smoke contract uses +//! (see [`super::smoke::SmokeTest`]). +//! - [`LogChunk`] is the value the trait returns. It's a pure data +//! type — easy to serialize for the dashboard, easy to assert +//! against in tests, no transport types leak through. +//! - [`PodmanLogQuery`] is the only implementor in v0.3: it sends a +//! `device-commands..logs` request over NATS, deserializes the +//! reply into [`LogChunk`]. Agent-side handler + dashboard wiring +//! ship in a follow-up PR (see "Deferred" below). +//! +//! ## Why a companion and not a method on `Score`? +//! +//! Per ADR-023 P7, framework capabilities that "wrap" a Score (smoke, +//! planning, observability, log tail) attach as companions rather +//! than as new methods on `Score`/`Interpret`. The base trait surface +//! stays one method (`create_interpret`). Log tail is *something +//! paired with* a Score, not *something on* a Score — and a Score +//! without a `LogQuery` should render "no logs view" on the +//! dashboard, not become a compilation error. +//! +//! The orthogonal benefits, which mirror smoke's: +//! +//! - A Score can be deployed with one of several log backends (podman, +//! journald, k8s) without rewriting the Score itself. +//! - Test harnesses substitute a mock `LogQuery` for an end-to-end +//! probe of the dashboard's "View logs" path, without spinning up a +//! real device. +//! - Adding a "live tail (stream)" verb in v0.4 is an additive trait +//! method on `LogQuery`, not a breaking change to `Score`. +//! +//! ## Why sync `last_lines` and not a stream? +//! +//! v0.3 ships **single-shot** log tail only. Customer's mental model +//! is "I clicked View logs, show me the last 100 lines". That's a +//! `Future`, not a `Stream`. The +//! request/reply NATS pattern (one inbox, one reply, then close) maps +//! 1:1 to that mental model and reuses the same dispatcher the +//! `Ping` verb already runs through. +//! +//! Live tail is genuinely useful — but it's a separate UX (the +//! dashboard needs scrollback management, the agent needs to attach +//! to a running container's log stream, the wire format needs frame +//! framing + final-marker handling) — and shipping it half-baked +//! would degrade the "View logs" experience the customer is asking +//! for *today*. v0.4 adds a `tail_stream` method to this trait, +//! additive. +//! +//! ## Wire shape (locked in this PR) +//! +//! Subject: `device-commands..logs` — exactly the existing +//! callout-permitted prefix (see +//! `harmony_reconciler_contracts::SUBJECT_PREFIX`). The deployment +//! name and the line count live in the JSON request body, not in the +//! subject, because: +//! +//! 1. The agent already subscribes to `device-commands..>`; the +//! permission template covers this subject without any callout +//! JWT change (zero ops surface). +//! 2. Putting the deployment in the subject would alias the verb +//! token, breaking +//! [`harmony_reconciler_contracts::Verb::as_subject_token`]'s +//! "verb is the trailing token" invariant. +//! +//! The request body is [`harmony_reconciler_contracts::LogsRequest`], +//! the reply body is [`harmony_reconciler_contracts::LogsReply`]. +//! Those types live in `reconciler-contracts` (not here) so the +//! agent build, which must not depend on harmony, can serialize them. +//! This companion is a thin wrapper around that contract — it owns +//! transport, the contract crate owns the bytes. +//! +//! ## Deferred to follow-up PRs (in scope of v0.3) +//! +//! - **Agent-side command handler** for `Verb::Logs` in +//! `fleet/harmony-fleet-agent/src/command_server.rs`: parse the +//! `LogsRequest`, look up the deployment's container, run +//! `podman logs --tail N`, serialize a `LogsReply`. The +//! deployment→container resolution must reject names that don't +//! match `[a-zA-Z0-9_.-]{1,128}` before invoking the runtime — +//! defense-in-depth on top of `Command::arg` (which already +//! avoids shell parsing). That regex is *stricter* than +//! [`harmony_reconciler_contracts::DeploymentName`] on the wire, +//! reflecting that the agent's resolved container name has tighter +//! constraints than a deployment identifier. +//! - **Dashboard handler** at +//! `/deployments//devices//logs?lines=N` in +//! `fleet/harmony-fleet-operator/src/frontend/server.rs`: +//! instantiate a `PodmanLogQuery`, call `last_lines`, render the +//! `LogChunk` as maud. +//! - **E2E integration test** that drives a real podman container +//! end-to-end through this trait. +//! +//! Splitting this is a scope-discipline call: the trait + value + +//! NATS-transport impl + unit tests are merge-worthy on their own +//! because they lock the contract. Once merged, the agent handler PR +//! is a focused diff against `command_server.rs` and the dashboard +//! handler PR is a focused diff against `server.rs` — both small, +//! both reviewable, both backed by this PR's existing tests. + +pub mod chunk; +pub mod podman; +pub mod query; + +pub use chunk::{LogChunk, LogQueryError}; +pub use podman::PodmanLogQuery; +pub use query::LogQuery; diff --git a/fleet/harmony-fleet-deploy/src/companion/logs/podman.rs b/fleet/harmony-fleet-deploy/src/companion/logs/podman.rs new file mode 100644 index 00000000..02be8627 --- /dev/null +++ b/fleet/harmony-fleet-deploy/src/companion/logs/podman.rs @@ -0,0 +1,378 @@ +//! `PodmanLogQuery` — the NATS request/reply implementation of +//! [`LogQuery`] for a [`PodmanV0Score`]. +//! +//! Transport: NATS request on +//! `device-commands..logs`. Body: a JSON +//! [`LogsRequest`] carrying the deployment name and a clamped line +//! count. Reply: a JSON [`LogsReply`] which we re-wrap into a +//! [`LogChunk`]. +//! +//! The query is constructed with the device id and deployment name +//! it targets — those are the routing parameters that the dashboard +//! handler already has on the URL path +//! (`/deployments//devices//logs`). The `LogQuery` trait +//! method takes `&Self::Score` for the compile-time pairing +//! guarantee, but the wire request doesn't need the score body +//! itself (the agent reads its own deployment KV). +//! +//! Why not let the trait method take `device_id`/`deployment` as +//! parameters? Because the trait must stay backend-agnostic. A +//! future `K8sLogQuery` doesn't have a device id; it has a pod name. +//! Routing parameters belong on the implementor, not on the trait. +//! +//! ## Split between routing and transport +//! +//! [`LogQueryRouting`] is the pure routing value: device id + +//! deployment + line cap, no transport. It owns `subject()` and +//! `request_body()` so unit tests can verify the wire bytes without +//! constructing a NATS client. [`PodmanLogQuery`] adds the +//! `async_nats::Client` on top and implements [`LogQuery`]. + +use std::time::Duration; + +use async_nats::Client; +use async_nats::error::Error as NatsError; +use async_trait::async_trait; +use harmony::modules::podman::{PodmanTopology, PodmanV0Score}; +use harmony_reconciler_contracts::{ + CommandRequest, DeploymentName, ErrorReply, Id, LOGS_MAX_LINES, LogsReply, LogsRequest, Verb, + device_command_subject, +}; + +use super::chunk::{LogChunk, LogQueryError, LogSource}; +use super::query::LogQuery; + +/// Default reply timeout for a single log query. 10 s gives a slow +/// podman socket (cold-cached image, busy device) room to breathe; +/// offline devices get short-circuited earlier by NATS's +/// `no_responders` reply, so the timeout only matters when the agent +/// is alive but slow. +pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); + +/// Routing parameters for one log query. Pure data — no NATS client, +/// no IO. Lets unit tests verify the wire bytes without a connected +/// client. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LogQueryRouting { + pub device_id: Id, + pub deployment: DeploymentName, +} + +impl LogQueryRouting { + pub fn new(device_id: Id, deployment: DeploymentName) -> Self { + Self { + device_id, + deployment, + } + } + + /// Build the NATS subject this query publishes to. Kept on the + /// routing value (not on the transport struct) so the agent-side + /// dispatcher tests in the follow-up PR can use the same helper + /// without pulling in a NATS client. + pub fn subject(&self) -> String { + device_command_subject(self.device_id.to_string().as_str(), Verb::Logs) + } + + /// Serialize the wire body. Clamps `n` to [`LOGS_MAX_LINES`] so + /// a buggy "show all" dashboard button can't ask for an + /// unbounded transfer. + pub fn request_body(&self, n: usize) -> Vec { + let req = CommandRequest::Logs(LogsRequest { + deployment: self.deployment.clone(), + lines: clamp_lines(n), + }); + serde_json::to_vec(&req).expect("CommandRequest is always serializable") + } +} + +/// Pin the conversion `usize -> u32` to one place so the clamping +/// behavior is testable in isolation. We saturate at +/// [`LOGS_MAX_LINES`] both because (a) the agent will clamp anyway, +/// and (b) sending a request advertising a smaller `lines` value +/// than the agent's cap makes the protocol log easier to read. +fn clamp_lines(n: usize) -> u32 { + let n_u32 = u32::try_from(n).unwrap_or(LOGS_MAX_LINES); + n_u32.min(LOGS_MAX_LINES) +} + +/// Operator-side log fetcher for a podman-runtime device. +/// +/// Holds the [`LogQueryRouting`] for the device+deployment it +/// targets plus the connected NATS client; the dashboard builds one +/// per request from the URL parts, then calls +/// `last_lines(&score, &topology, n)`. +pub struct PodmanLogQuery { + nc: Client, + routing: LogQueryRouting, + timeout: Duration, +} + +impl PodmanLogQuery { + pub fn new(nc: Client, device_id: Id, deployment: DeploymentName) -> Self { + Self { + nc, + routing: LogQueryRouting::new(device_id, deployment), + timeout: DEFAULT_TIMEOUT, + } + } + + /// Override the default reply timeout. Tests use a very small + /// value to keep the suite fast; production sticks with + /// [`DEFAULT_TIMEOUT`]. + pub fn with_timeout(mut self, timeout: Duration) -> Self { + self.timeout = timeout; + self + } + + pub fn routing(&self) -> &LogQueryRouting { + &self.routing + } +} + +#[async_trait] +impl LogQuery for PodmanLogQuery { + type Score = PodmanV0Score; + + async fn last_lines( + &self, + _score: &Self::Score, + _topology: &PodmanTopology, + n: usize, + ) -> Result { + let subject = self.routing.subject(); + let payload = self.routing.request_body(n); + + let fut = self.nc.request(subject, payload.into()); + let resp = match tokio::time::timeout(self.timeout, fut).await { + Ok(Ok(msg)) => msg, + Ok(Err(err)) => return Err(map_request_error(err, self.timeout)), + Err(_) => return Err(LogQueryError::Timeout(self.timeout)), + }; + + decode_reply(&resp.payload) + } +} + +/// Map an `async-nats` `RequestError` to our typed surface. Same +/// shape `FleetCommandsClient` uses for `Ping` — we propagate +/// `NoResponders` and `TimedOut` so the dashboard can render them +/// differently. +fn map_request_error( + err: NatsError, + timeout: Duration, +) -> LogQueryError { + use async_nats::client::RequestErrorKind; + match err.kind() { + RequestErrorKind::NoResponders => LogQueryError::DeviceOffline, + RequestErrorKind::TimedOut => LogQueryError::Timeout(timeout), + _ => LogQueryError::Transport(err.to_string()), + } +} + +/// Decode a reply payload. The agent can reply with either a +/// [`LogsReply`] (happy path) or an [`ErrorReply`] (the agent +/// successfully received the request but couldn't fulfil it). We try +/// `LogsReply` first because it's the common case; only if that +/// fails do we attempt `ErrorReply`. Falling all the way through +/// yields [`LogQueryError::InvalidReply`]. +/// +/// `pub(crate)` so unit tests can drive it without a live NATS +/// server. Not part of the public API — callers always go through +/// [`PodmanLogQuery::last_lines`]. +pub(crate) fn decode_reply(bytes: &[u8]) -> Result { + // Happy path first — keeps the cold path cost on errors only. + if let Ok(reply) = serde_json::from_slice::(bytes) { + let source = LogSource::try_new(reply.source.clone()) + .map_err(|e| LogQueryError::InvalidReply(format!("invalid source: {e}")))?; + return Ok(LogChunk { + source, + captured_at: reply.captured_at, + lines: reply.lines, + truncated: reply.truncated, + }); + } + if let Ok(err) = serde_json::from_slice::(bytes) { + return Err(LogQueryError::Agent(err.message)); + } + // Neither shape matched — the agent is speaking a protocol we + // don't understand. The dashboard will render this as a version + // mismatch. + Err(LogQueryError::InvalidReply(format!( + "reply did not match LogsReply or ErrorReply ({} bytes)", + bytes.len() + ))) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{DateTime, Utc}; + use harmony_reconciler_contracts::{ErrorKind, ErrorReply}; + + fn dn(s: &str) -> DeploymentName { + DeploymentName::try_new(s).expect("valid") + } + + fn routing() -> LogQueryRouting { + LogQueryRouting::new(Id::from("pi-42".to_string()), dn("hello-web")) + } + + // -- clamp_lines --------------------------------------------------------- + + #[test] + fn clamp_lines_caps_at_protocol_max() { + // Operators can call `last_lines(usize::MAX, …)` — the wire + // value must stay within the budget the agent enforces. + assert_eq!(clamp_lines(usize::MAX), LOGS_MAX_LINES); + assert_eq!(clamp_lines(LOGS_MAX_LINES as usize + 1), LOGS_MAX_LINES); + } + + #[test] + fn clamp_lines_passes_through_small_values() { + assert_eq!(clamp_lines(0), 0); + assert_eq!(clamp_lines(1), 1); + assert_eq!(clamp_lines(100), 100); + } + + // -- LogQueryRouting::subject() ----------------------------------------- + + #[test] + fn subject_matches_documented_format() { + // The subject the agent's existing `device-commands..>` + // subscription receives. If this drifts, the callout + // permission template stops covering the request — silent + // routing failure under JWT enforcement. + let r = routing(); + assert_eq!(r.subject(), "device-commands.pi-42.logs"); + } + + // -- LogQueryRouting::request_body() ------------------------------------ + + #[test] + fn request_body_is_tagged_json_with_clamped_lines() { + // Locks the wire format. Older agents that learn `Verb::Logs` + // expect this exact shape on receipt. + let r = routing(); + let body = r.request_body(50); + let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(parsed["verb"], "logs"); + assert_eq!(parsed["deployment"], "hello-web"); + assert_eq!(parsed["lines"], 50); + } + + #[test] + fn request_body_clamps_oversized_n() { + // A dashboard with a buggy "show all" button must not get + // through unchecked. We clamp on the operator side too — + // defense in depth, and a more honest log message ("asked + // for 1000") than "asked for usize::MAX, agent capped". + let r = routing(); + let body = r.request_body(usize::MAX); + let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(parsed["lines"], LOGS_MAX_LINES); + } + + // -- decode_reply() ------------------------------------------------------ + + #[test] + fn decode_reply_unwraps_logs_reply_into_log_chunk() { + let reply = LogsReply { + source: "podman:hello-web/api".to_string(), + captured_at: DateTime::parse_from_rfc3339("2026-01-15T12:34:56Z") + .unwrap() + .with_timezone(&Utc), + lines: vec!["line a".into(), "line b".into()], + truncated: false, + }; + let bytes = serde_json::to_vec(&reply).unwrap(); + let chunk = decode_reply(&bytes).expect("valid LogsReply must decode"); + assert_eq!(chunk.source.as_str(), "podman:hello-web/api"); + assert_eq!(chunk.lines, vec!["line a", "line b"]); + assert!(!chunk.truncated); + } + + #[test] + fn decode_reply_carries_truncated_flag() { + // The dashboard renders "showing last N of more" off this + // flag. If we accidentally drop it on the wire, that hint + // disappears and customers think they have full output. + let reply = LogsReply { + source: "podman:hello-web/api".to_string(), + captured_at: Utc::now(), + lines: vec!["only".into()], + truncated: true, + }; + let bytes = serde_json::to_vec(&reply).unwrap(); + let chunk = decode_reply(&bytes).unwrap(); + assert!(chunk.truncated); + } + + #[test] + fn decode_reply_surfaces_agent_error_reply_as_agent_variant() { + // The agent can fail without crashing — e.g. the deployment + // exists in KV but the container hasn't started yet. That + // must surface as `Agent(message)` so the dashboard renders + // the agent's own description, not "device offline". + let err = ErrorReply { + kind: ErrorKind::BadRequest, + message: "deployment hello-web has no containers running".into(), + }; + let bytes = serde_json::to_vec(&err).unwrap(); + let result = decode_reply(&bytes); + match result { + Err(LogQueryError::Agent(msg)) => { + assert!(msg.contains("hello-web")); + } + other => panic!("expected Agent variant, got {other:?}"), + } + } + + #[test] + fn decode_reply_rejects_invalid_source_name() { + // A malicious or buggy agent could return a `source` with + // control characters that would corrupt the dashboard. + // LogSource validation catches this — we surface it as + // InvalidReply, not a panic, and not a successful chunk. + let bad = serde_json::json!({ + "source": "bad\nname", + "captured_at": "2026-01-15T12:00:00Z", + "lines": [], + "truncated": false, + }); + let bytes = serde_json::to_vec(&bad).unwrap(); + match decode_reply(&bytes) { + Err(LogQueryError::InvalidReply(_)) => {} + other => panic!("expected InvalidReply, got {other:?}"), + } + } + + #[test] + fn decode_reply_falls_through_to_invalid_for_unknown_shape() { + // Neither LogsReply nor ErrorReply — a future verb's reply + // accidentally routed here, or an upgraded agent talking a + // newer protocol. The dashboard shows "protocol mismatch". + let bytes = br#"{"completely":"unrelated"}"#; + match decode_reply(bytes) { + Err(LogQueryError::InvalidReply(_)) => {} + other => panic!("expected InvalidReply, got {other:?}"), + } + } + + // -- LogQuery pairing (compile-time) ------------------------------------ + + /// Compile-time test: `PodmanLogQuery::Score == PodmanV0Score`. + /// If a future refactor changes the associated type without + /// updating callers, this test fails to compile — which is + /// exactly the compile-time-safety guarantee the companion + /// pattern exists to provide. + #[test] + fn paired_score_type_is_podman_v0_score() { + fn assert_pair() + where + Q: LogQuery, + { + } + assert_pair::(); + } +} diff --git a/fleet/harmony-fleet-deploy/src/companion/logs/query.rs b/fleet/harmony-fleet-deploy/src/companion/logs/query.rs new file mode 100644 index 00000000..c3f73f9c --- /dev/null +++ b/fleet/harmony-fleet-deploy/src/companion/logs/query.rs @@ -0,0 +1,66 @@ +//! The `LogQuery` trait — companion contract for "fetch the last N +//! lines of this Score's container logs". +//! +//! The trait pairs a [`Score`] with a way to query its logs, with the +//! pairing locked at the type level via the `Score` associated type. +//! Same mechanism the smoke contract uses; same compile-time +//! guarantee that callers can't pass a `PodmanLogQuery` to a +//! `FleetOperatorScore`. +//! +//! Object safety: kept *not* object-safe on purpose. Callers in v0.3 +//! always know the concrete Score type at the call site (the +//! dashboard handler is `Score = PodmanV0Score` at compile time — +//! topologies are compile-time per ADR-023 P6). When a second Score +//! type starts shipping logs (e.g. k8s `Deployment`), we add another +//! impl, not a `Box`. + +use async_trait::async_trait; + +use harmony::score::Score; +use harmony::topology::Topology; + +use super::chunk::{LogChunk, LogQueryError}; + +/// Pair a [`Score`] with a way to fetch the last N lines of its +/// container logs. +/// +/// Implementors live next to the transport they use: +/// [`super::PodmanLogQuery`] for NATS request/reply against the fleet +/// agent's podman runtime. Future impls (`K8sLogQuery`, +/// `JournaldLogQuery`) follow the same shape. +/// +/// Why a trait and not a free function? Because mocking is the point +/// of this companion. The dashboard handler takes +/// `&dyn LogQuery<…>` (statically dispatched in v0.3, but the trait +/// shape is mock-friendly): the e2e test substitutes a +/// `FixtureLogQuery` that returns a canned `LogChunk` without needing +/// a real device, a real NATS server, or a real podman socket. That +/// makes the dashboard's "View logs" path testable in seconds, not +/// minutes. +#[async_trait] +pub trait LogQuery: Send + Sync { + /// The Score this query is paired with. Type-locked so the + /// compiler refuses `podman.last_lines(&operator_score, …)` — + /// the wrong Score type doesn't even compile. + type Score: Score; + + /// Fetch the most recent `n` lines of `score`'s container logs. + /// + /// Semantics: + /// - Returns oldest-first lines, up to `n`. + /// - If the runtime has fewer than `n` lines, returns all of them + /// with `LogChunk::truncated = false`. + /// - If the per-request size budget cuts the response, + /// `LogChunk::truncated = true`. + /// - `n == 0` is legal and returns an empty chunk. Callers should + /// prefer a sensible default (e.g. 100) at the UI layer rather + /// than relying on this edge case. + /// + /// Failure modes are categorized via [`LogQueryError`]. + async fn last_lines( + &self, + score: &Self::Score, + topology: &T, + n: usize, + ) -> Result; +} diff --git a/fleet/harmony-fleet-deploy/src/companion/mod.rs b/fleet/harmony-fleet-deploy/src/companion/mod.rs index 52d32d43..61bb16bc 100644 --- a/fleet/harmony-fleet-deploy/src/companion/mod.rs +++ b/fleet/harmony-fleet-deploy/src/companion/mod.rs @@ -13,13 +13,19 @@ //! - [`AgentObservation`] — derived view of a fleet agent's KV watch //! filter, computed from a typed `AgentConfig`. Used by the e2e //! harness to write desired-state keys the agent will pick up. -//! - [`smoke`] — the smoke-test contract. `Probe` trait, `SmokeSuite` -//! composition, `SmokeTest` companion, and the `deploy`/ -//! `deploy_with_smoke` wrappers that bind a Score's interpret to -//! an optional smoke run. Implements ADR-023 P4 ("deploy returns -//! only after smoke-test success") via the companion seam from P7. +//! - [`smoke`] — the smoke-test contract. One `SmokeTest` trait with +//! a single `verify` method returning a `SmokeReport`, plus +//! `deploy` / `deploy_with_smoke` free functions. Implements +//! ADR-023 P4 ("deploy returns only after smoke-test success") via +//! the companion seam from P7. +//! - [`logs`] — the log-tail contract. `LogQuery` companion trait, +//! `LogChunk` value, and the `PodmanLogQuery` NATS request/reply +//! implementation. Powers the dashboard's "View logs" path — +//! customer clicks the button, gets the last N lines from the +//! device. Live tail (streaming) is v0.4. pub mod agent; +pub mod logs; pub mod smoke; pub use agent::AgentObservation; diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke.rs b/fleet/harmony-fleet-deploy/src/companion/smoke.rs new file mode 100644 index 00000000..6fe238f4 --- /dev/null +++ b/fleet/harmony-fleet-deploy/src/companion/smoke.rs @@ -0,0 +1,633 @@ +//! Smoke-test companion — minimal shape. +//! +//! Implements ADR-023 P4 ("deploy returns only after smoke-test +//! success") and P7 ("extend Scores with companions, not API +//! changes"). The earlier draft of this module decomposed smoke into +//! `Probe` / `ProbeAttempt` / `ProbeOutcome` / `ProbeFailure` / +//! `ProbeName` / `RetryPolicy` / `SmokeSuite` / `SmokeStage` / +//! `SmokeReport` / `SmokeTest` / `SmokeAssemblyError`. That was +//! cardinality matching gone overboard for what is, in the end, "run +//! an async function after deploy and surface a pipeline report." +//! This rewrite picks the smallest seam that still buys the things we +//! actually need. +//! +//! ## What we kept +//! +//! - [`SmokeTest`] — one trait, one async method. Implementer writes a +//! normal Rust function and returns a [`SmokeReport`]. +//! - Type-level pairing via the associated `type Interpret` +//! (suggested in PR #292 review): one smoke can cover every Score +//! that shares an Interpret (e.g. `HelmChartScore` + any preset on +//! top of it). **Pairing is convention-only in v0.3** — +//! `Score::create_interpret` returns a `Box>`, so +//! the `deploy_with_smoke` call site can't statically refuse a +//! mismatched smoke. Closing that gap is an additive refactor on +//! the `Score` trait (`type Interpret: Interpret;`) and is +//! deliberately deferred — the API shape supports it. +//! - [`deploy`] / [`deploy_with_smoke`] — free functions. `deploy` +//! runs the Score; `deploy_with_smoke` runs the Score then blocks +//! on smoke. +//! - Pipeline data the dashboard renders top-to-bottom +//! ([`CheckReport`] entries on [`SmokeReport`]). +//! +//! ## What we dropped, and what to do instead +//! +//! | Old type | Replaced by | +//! |---------------------------------------|-----------------------------------------------------------------------------| +//! | `Probe`, `ProbeAttempt`, `ProbeOutcome`, `ProbeFailure` | Write a normal `async fn` inside `verify`; push `CheckReport`s into a `Vec`. | +//! | `ProbeName` (validated newtype) | `&'static str` — the names come from our own static code, not user input. | +//! | `RetryPolicy` value type | [`poll_until`] free function. Pass the budget + interval directly. | +//! | `SmokeSuite` builder | A `Vec` collected inside `verify`. No intermediate type. | +//! | `TcpReachable` probe impl | [`tcp_reachable`] free function. Call it; it returns a `CheckReport`. | +//! +//! Reusability is preserved: the helpers ([`poll_until`], +//! [`tcp_reachable`]) compose by simple function call. A future +//! `http_healthy(url)` or `k8s_pod_ready(client, name)` belongs in +//! this module as another `async fn -> CheckReport`, not as another +//! trait. + +use std::future::Future; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use harmony::interpret::{Interpret, InterpretError, Outcome}; +use harmony::inventory::Inventory; +use harmony::score::Score; +use harmony::topology::Topology; +use thiserror::Error; +use tracing::{info, info_span}; + +/// Pair an [`Interpret`] type with an async smoke verification. +/// +/// One method. Implementers compose their checks inline; no probe +/// trait, no suite builder, no retry-policy value type. If retries +/// are needed, wrap the closure with [`poll_until`]; otherwise just +/// `await` once. +/// +/// Associating with `Interpret` (not the concrete `Score`) lets a +/// single smoke impl cover a family of Scores that share an +/// Interpret. Convention-only enforcement in v0.3 — see the +/// module-level docs. +#[async_trait] +pub trait SmokeTest: Send + Sync { + /// The Interpret class this smoke verifies. Documentary in v0.3; + /// becomes compile-time enforceable once `Score` gains its own + /// `type Interpret`. + type Interpret: Interpret; + + /// Run all checks. Implementers are free to short-circuit on + /// first failure, or run every check to give the dashboard a + /// full per-step picture. The framework reads `report.passed()` + /// to gate the deploy and renders `report.checks` for the user. + async fn verify(&self, topology: &T) -> SmokeReport; +} + +/// Aggregated smoke result. Dashboards render `checks` top-to-bottom. +/// +/// A report is "passing" iff it has at least one check AND every +/// check passed. An empty report passes nothing — the framework +/// rejects empty reports the same way it rejects failing ones, so a +/// smoke impl that forgets to push any checks fails loudly instead +/// of letting a deploy through. +#[derive(Debug, Clone, Default)] +pub struct SmokeReport { + pub checks: Vec, +} + +impl SmokeReport { + pub fn passed(&self) -> bool { + !self.checks.is_empty() && self.checks.iter().all(|c| c.passed) + } + + pub fn is_empty(&self) -> bool { + self.checks.is_empty() + } + + /// Iterator over only the failures — handy for compact error + /// rendering without filtering at the call site. + pub fn failed(&self) -> impl Iterator { + self.checks.iter().filter(|c| !c.passed) + } +} + +/// One step's result, named for the dashboard and the operator's +/// log. `detail` is `Some` whenever there's something useful to show +/// (a connect error, the elapsed time, the response body's first +/// line) — leave it `None` for trivial passes. +#[derive(Debug, Clone)] +pub struct CheckReport { + pub name: &'static str, + pub passed: bool, + pub detail: Option, +} + +impl CheckReport { + pub fn pass(name: &'static str) -> Self { + Self { + name, + passed: true, + detail: None, + } + } + + pub fn pass_with(name: &'static str, detail: impl Into) -> Self { + Self { + name, + passed: true, + detail: Some(detail.into()), + } + } + + pub fn fail(name: &'static str, detail: impl Into) -> Self { + Self { + name, + passed: false, + detail: Some(detail.into()), + } + } +} + +/// Poll a fallible async closure until it returns `Ok` or `budget` +/// elapses. Returns a single [`CheckReport`] capturing the outcome. +/// +/// Use this inside [`SmokeTest::verify`] when you need retries. +/// Otherwise just `await` once and build a `CheckReport` from the +/// result — no helper required. +pub async fn poll_until( + name: &'static str, + budget: Duration, + interval: Duration, + check: F, +) -> CheckReport +where + F: Fn() -> Fut, + Fut: Future>, +{ + let deadline = Instant::now() + budget; + loop { + match check().await { + Ok(()) => return CheckReport::pass(name), + Err(e) => { + if Instant::now() >= deadline { + return CheckReport::fail(name, format!("did not pass within {budget:?}: {e}")); + } + tokio::time::sleep(interval).await; + } + } + } +} + +/// One TCP connect attempt against `addr`, gated by `timeout`. +/// +/// Wrap with [`poll_until`] for retry semantics: +/// +/// ```ignore +/// poll_until("nats-reachable", Duration::from_secs(30), Duration::from_secs(1), || async { +/// tcp_reachable("once", "nats:4222", Duration::from_secs(1)) +/// .await +/// .passed +/// .then_some(()) +/// .ok_or_else(|| "still not reachable".to_string()) +/// }).await +/// ``` +pub async fn tcp_reachable(name: &'static str, addr: &str, timeout: Duration) -> CheckReport { + match tokio::time::timeout(timeout, tokio::net::TcpStream::connect(addr)).await { + Ok(Ok(_)) => CheckReport::pass(name), + Ok(Err(e)) => CheckReport::fail(name, format!("connect failed: {e}")), + Err(_) => CheckReport::fail(name, format!("timeout after {timeout:?}")), + } +} + +/// What `deploy*` returns on success. `smoke` is `None` for the +/// no-smoke variant, `Some` (with `passed() == true`) otherwise. +#[derive(Debug, Clone)] +pub struct DeployOutcome { + pub interpret: Outcome, + pub smoke: Option, +} + +#[derive(Debug, Error)] +pub enum DeployError { + #[error("score interpret failed: {0}")] + Interpret(#[from] InterpretError), + #[error( + "smoke failed: {} of {} check(s) did not pass", + .report.failed().count(), + .report.checks.len(), + )] + SmokeFailed { + /// The Score deployed cleanly — preserved so the dashboard + /// can render the success message next to the smoke failure. + interpret: Outcome, + report: SmokeReport, + }, +} + +/// Deploy a Score; do **not** run any smoke. Equivalent to calling +/// `score.interpret(...)` directly — provided so callers can switch +/// between smoke / no-smoke at one site. +pub async fn deploy( + score: S, + inventory: &Inventory, + topology: &T, +) -> Result +where + S: Score, + T: Topology, +{ + let span = info_span!("deploy", score = score.name()); + let _g = span.enter(); + info!("deploy: starting interpret (no smoke)"); + let interpret = score.interpret(inventory, topology).await?; + info!(status = %interpret.status, "deploy: interpret returned"); + Ok(DeployOutcome { + interpret, + smoke: None, + }) +} + +/// Deploy a Score and block on its smoke test (ADR-023 P4). +/// +/// Returns `Ok` only when every check in the report passed. An empty +/// report is rejected the same way a failing one is — a smoke impl +/// that forgot to push checks fails the deploy rather than silently +/// passing. +pub async fn deploy_with_smoke( + score: S, + smoke: &ST, + inventory: &Inventory, + topology: &T, +) -> Result +where + S: Score, + ST: SmokeTest, + T: Topology, +{ + let span = info_span!("deploy", score = score.name(), smoke = true); + let _g = span.enter(); + info!("deploy: starting interpret"); + let interpret = score.interpret(inventory, topology).await?; + info!(status = %interpret.status, "deploy: interpret returned; running smoke"); + let report = smoke.verify(topology).await; + info!( + passed = report.passed(), + checks = report.checks.len(), + failed = report.failed().count(), + "deploy: smoke returned", + ); + if report.passed() { + Ok(DeployOutcome { + interpret, + smoke: Some(report), + }) + } else { + Err(DeployError::SmokeFailed { interpret, report }) + } +} + +#[cfg(test)] +mod tests { + //! Behavioural tests for the minimal smoke shape. + //! + //! These exercise the deploy ↔ smoke handoff against a trivial + //! in-process Score / Interpret / Topology so we don't depend on + //! any external infrastructure. The fixtures are intentionally + //! tiny — they only carry what each test actually touches. + + use super::*; + use async_trait::async_trait; + use harmony::data::Version; + use harmony::interpret::{InterpretName, InterpretStatus}; + use harmony::inventory::Inventory; + use harmony::topology::{PreparationError, PreparationOutcome}; + use harmony_types::id::Id; + use serde::Serialize; + use std::net::SocketAddr; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::{Arc, Mutex}; + use tokio::net::TcpListener; + + #[derive(Debug)] + struct NoopTopology; + + #[async_trait] + impl Topology for NoopTopology { + fn name(&self) -> &str { + "noop" + } + async fn ensure_ready(&self) -> Result { + Ok(PreparationOutcome::Noop) + } + } + + /// A `Score` that records execute() calls and returns a configured + /// outcome. Tracks invocations so a test can assert "interpret ran + /// exactly once". + #[derive(Debug, Clone)] + struct CountingScore { + executed: Arc, + fail: bool, + } + + impl CountingScore { + fn new() -> Self { + Self { + executed: Arc::new(AtomicBool::new(false)), + fail: false, + } + } + fn failing() -> Self { + Self { + executed: Arc::new(AtomicBool::new(false)), + fail: true, + } + } + } + + impl Serialize for CountingScore { + fn serialize(&self, s: S) -> Result { + s.serialize_str("CountingScore") + } + } + + #[async_trait] + impl Score for CountingScore { + fn create_interpret(&self) -> Box> { + Box::new(CountingInterpret { + executed: self.executed.clone(), + fail: self.fail, + }) + } + fn name(&self) -> String { + "CountingScore".into() + } + } + + #[derive(Debug)] + struct CountingInterpret { + executed: Arc, + fail: bool, + } + + #[async_trait] + impl Interpret for CountingInterpret { + fn get_name(&self) -> InterpretName { + InterpretName::Custom("counting") + } + fn get_version(&self) -> Version { + Version::from("0.1.0").unwrap() + } + fn get_status(&self) -> InterpretStatus { + InterpretStatus::QUEUED + } + fn get_children(&self) -> Vec { + vec![] + } + async fn execute( + &self, + _i: &Inventory, + _t: &NoopTopology, + ) -> Result { + self.executed.store(true, Ordering::SeqCst); + if self.fail { + Err(InterpretError::new( + "counting: configured to fail".to_string(), + )) + } else { + Ok(Outcome::success("counting: ok".to_string())) + } + } + } + + /// A test smoke that returns a pre-recorded report and records + /// whether `verify` was called. Lets each test inject the exact + /// report shape it wants to drive `deploy_with_smoke` through. + struct FixedSmoke { + report: Mutex>, + verify_called: Arc, + } + + impl FixedSmoke { + fn new(report: SmokeReport) -> Self { + Self { + report: Mutex::new(Some(report)), + verify_called: Arc::new(AtomicBool::new(false)), + } + } + } + + #[async_trait] + impl SmokeTest for FixedSmoke { + type Interpret = CountingInterpret; + async fn verify(&self, _t: &NoopTopology) -> SmokeReport { + self.verify_called.store(true, Ordering::SeqCst); + self.report + .lock() + .unwrap() + .take() + .expect("verify called more than once") + } + } + + #[tokio::test] + async fn deploy_runs_interpret_and_returns_outcome() { + let score = CountingScore::new(); + let executed = score.executed.clone(); + let outcome = deploy(score, &Inventory::empty(), &NoopTopology) + .await + .expect("clean interpret should produce Ok"); + assert!(executed.load(Ordering::SeqCst), "interpret must run"); + assert!(outcome.smoke.is_none(), "no-smoke variant has no report"); + assert_eq!(outcome.interpret.message, "counting: ok"); + } + + #[tokio::test] + async fn deploy_propagates_interpret_failure() { + let result = deploy(CountingScore::failing(), &Inventory::empty(), &NoopTopology).await; + assert!(matches!(result, Err(DeployError::Interpret(_)))); + } + + #[tokio::test] + async fn deploy_with_smoke_returns_report_on_success() { + let score = CountingScore::new(); + let smoke = FixedSmoke::new(SmokeReport { + checks: vec![CheckReport::pass("a"), CheckReport::pass("b")], + }); + let outcome = deploy_with_smoke(score, &smoke, &Inventory::empty(), &NoopTopology) + .await + .expect("passing smoke should produce Ok"); + let report = outcome.smoke.expect("smoke report present on success"); + assert!(report.passed()); + assert_eq!(report.checks.len(), 2); + assert!(smoke.verify_called.load(Ordering::SeqCst)); + } + + #[tokio::test] + async fn deploy_with_smoke_fails_when_any_check_fails() { + // Pin the deploy-blocks-on-smoke contract: a single failed + // check fails the whole deploy, regardless of how many + // passed. The `interpret` field of the error must still be + // populated so the dashboard can show what succeeded + // alongside what didn't. + let smoke = FixedSmoke::new(SmokeReport { + checks: vec![ + CheckReport::pass("a"), + CheckReport::fail("b", "bad"), + CheckReport::pass("c"), + ], + }); + let err = deploy_with_smoke( + CountingScore::new(), + &smoke, + &Inventory::empty(), + &NoopTopology, + ) + .await + .expect_err("failing smoke must error"); + match err { + DeployError::SmokeFailed { report, interpret } => { + assert!(!report.passed()); + assert_eq!(report.failed().count(), 1); + assert_eq!(interpret.message, "counting: ok"); + } + other => panic!("expected SmokeFailed, got {other:?}"), + } + } + + #[tokio::test] + async fn deploy_with_smoke_rejects_empty_report() { + // A smoke impl that forgot to push any checks must fail + // loudly. Silent pass-through would defeat the entire point + // of the contract. + let smoke = FixedSmoke::new(SmokeReport::default()); + let err = deploy_with_smoke( + CountingScore::new(), + &smoke, + &Inventory::empty(), + &NoopTopology, + ) + .await + .expect_err("empty smoke must error"); + assert!(matches!(err, DeployError::SmokeFailed { .. })); + } + + #[tokio::test] + async fn deploy_with_smoke_skips_smoke_when_interpret_fails() { + // Interpret error short-circuits — running smoke against a + // half-deployed component would produce confusing cascade + // failures. + let smoke = FixedSmoke::new(SmokeReport { + checks: vec![CheckReport::pass("never-runs")], + }); + let verify_called = smoke.verify_called.clone(); + let err = deploy_with_smoke( + CountingScore::failing(), + &smoke, + &Inventory::empty(), + &NoopTopology, + ) + .await + .expect_err("interpret failure must error"); + assert!(matches!(err, DeployError::Interpret(_))); + assert!( + !verify_called.load(Ordering::SeqCst), + "smoke must NOT run when interpret fails", + ); + } + + #[tokio::test] + async fn smoke_report_passed_requires_nonempty_and_all_pass() { + assert!(!SmokeReport::default().passed()); + assert!( + !SmokeReport { + checks: vec![CheckReport::pass("a"), CheckReport::fail("b", "x")], + } + .passed() + ); + assert!( + SmokeReport { + checks: vec![CheckReport::pass("a"), CheckReport::pass("b")], + } + .passed() + ); + } + + #[tokio::test] + async fn poll_until_returns_pass_when_check_succeeds_within_budget() { + let calls = Arc::new(AtomicBool::new(false)); + let c = calls.clone(); + let check = poll_until( + "eventually-ok", + Duration::from_secs(2), + Duration::from_millis(10), + move || { + let c = c.clone(); + async move { + if c.swap(true, Ordering::SeqCst) { + Ok(()) + } else { + Err("not yet".into()) + } + } + }, + ) + .await; + assert!(check.passed); + assert_eq!(check.name, "eventually-ok"); + } + + #[tokio::test] + async fn poll_until_returns_fail_after_budget_elapses() { + let check = poll_until( + "never-ok", + Duration::from_millis(50), + Duration::from_millis(10), + || async { Err("still nope".into()) }, + ) + .await; + assert!(!check.passed); + assert!( + check + .detail + .as_deref() + .is_some_and(|d| d.contains("still nope")), + "detail must carry the last error, got {:?}", + check.detail, + ); + } + + #[tokio::test] + async fn tcp_reachable_passes_against_real_listener() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr: SocketAddr = listener.local_addr().unwrap(); + // Accept loop so connect() completes; otherwise SYN-RST. + tokio::spawn(async move { + let _ = listener.accept().await; + }); + + let check = tcp_reachable("loopback", &addr.to_string(), Duration::from_secs(1)).await; + assert!(check.passed, "loopback connect must succeed: {check:?}"); + } + + #[tokio::test] + async fn tcp_reachable_fails_with_timeout_when_no_listener() { + // RFC 5737 TEST-NET-1: guaranteed-non-routable; a SYN here + // will sit unanswered and we'll hit our own timeout deterministically. + let check = tcp_reachable("dead-net", "192.0.2.1:1", Duration::from_millis(150)).await; + assert!(!check.passed); + assert!( + check + .detail + .as_deref() + .is_some_and(|d| d.contains("timeout") || d.contains("connect failed")), + ); + } + + #[tokio::test] + async fn deploy_outcome_smoke_field_is_none_for_no_smoke_deploy() { + let outcome = deploy(CountingScore::new(), &Inventory::empty(), &NoopTopology) + .await + .unwrap(); + assert!(outcome.smoke.is_none()); + } +} diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/contract.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/contract.rs deleted file mode 100644 index 67165db2..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/contract.rs +++ /dev/null @@ -1,82 +0,0 @@ -//! The Score → SmokeTest pairing contract. -//! -//! [`SmokeTest`] is the companion trait that pairs a [`Score`] with -//! the probes that verify *that specific* Score's deployment worked. -//! The pairing is type-level via the `Score` associated type — the -//! compiler refuses to call -//! [`deploy_with_smoke`](super::deploy::deploy_with_smoke) with a -//! `FleetOperatorSmokeTest` for a `DnsScore`, because their -//! associated types don't match. This is the same trick that makes -//! `K8sBareTopology: PostgreSQL` an unsatisfied trait bound when you -//! accidentally try to run PostgreSQL on a topology that doesn't -//! advertise it — see ADR-024 §2 and JG's *For the Love of -//! Compilers* talk. -//! -//! ## Why not a method on `Score`? -//! -//! Per ADR-023 P7, framework capabilities attach as companions, not -//! as new methods on `Score`/`Interpret`. The base trait surface -//! stays one method (`create_interpret`). Smoke is *something paired -//! with* a Score, not *something on* a Score. The orthogonal -//! benefit: a Score can be deployed without smoke, can be deployed -//! with one of several smoke configurations (dev, prod, e2e), or -//! can have its smoke evolved independently of its interpret logic. -//! -//! ## Why `assemble` returns a value, not a future-of-checks? -//! -//! The probes a smoke test wants to run are usually known up front -//! from the Score's data — "the service name we just deployed, -//! check it on the cluster's default DNS". Resolving that into a -//! concrete `SmokeSuite` *before* the probes run means the dashboard -//! can render the pipeline (probe names, stage count) the moment -//! smoke starts. If it returned a `Stream`, the -//! pipeline shape wouldn't be knowable until the run completed. - -use async_trait::async_trait; -use thiserror::Error; - -use harmony::score::Score; -use harmony::topology::Topology; - -use super::suite::SmokeSuite; - -/// Pair a [`Score`] with the [`SmokeSuite`] that proves its deploy -/// converged. -/// -/// Implementors live next to the Score they pair with — the -/// `FleetOperatorSmokeTest` ships from this crate alongside -/// `FleetOperatorScore`, the same way [`crate::AgentObservation`] -/// lives alongside `FleetAgentScore`. This is the companion -/// placement rule from ADR-023 §"Extend Scores with companions". -#[async_trait] -pub trait SmokeTest: Send + Sync { - /// The Score this smoke test verifies. The type lock means - /// `SM::Score = S` is enforced at every call site. - type Score: Score; - - /// Build the [`SmokeSuite`] that will run after the Score's - /// `interpret` succeeds. Has access to the Score so probes can - /// be parameterized on the deploy's actual inputs (service - /// names, namespaces, ports), and to the topology so probes can - /// pull credentials / endpoints from declared capabilities. - async fn assemble( - &self, - score: &Self::Score, - topology: &T, - ) -> Result, SmokeAssemblyError>; -} - -/// Why a smoke test failed to *build* (before any probe runs). -/// -/// Distinct from probe failure — assembly failure means the Score -/// is in an unexpected shape (missing endpoint, can't resolve a -/// reference). The dashboard shows this differently from a probe -/// failure: it's an authoring / configuration error, not a deployed- -/// system error. -#[derive(Debug, Clone, Error, PartialEq, Eq)] -pub enum SmokeAssemblyError { - #[error("smoke test could not derive endpoint for {what}: {detail}")] - MissingEndpoint { what: String, detail: String }, - #[error("smoke test assembly failed: {0}")] - Other(String), -} diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/deploy.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/deploy.rs deleted file mode 100644 index aa1aa040..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/deploy.rs +++ /dev/null @@ -1,387 +0,0 @@ -//! `deploy` and `deploy_with_smoke` — the framework-glue wrappers -//! that bind a [`Score`]'s interpret to an optional [`SmokeTest`]. -//! -//! These are free functions, not methods on [`Score`]/[`Maestro`] -//! /[`Interpret`]. The framework's domain directory is left -//! untouched in this PR (ADR-023 P7 — "extend Scores with companions, -//! not API changes"). If a later PR proves this shape, promoting -//! `deploy_with_smoke` to a `Maestro` convenience is a one-line -//! additive change. -//! -//! ## Opting in or out -//! -//! Two entry points, one decision at the call site: -//! -//! - [`deploy`] — run the Score's interpret, no smoke. -//! - [`deploy_with_smoke`] — run the Score's interpret, then the -//! paired [`SmokeTest`]. Returns only after the smoke run -//! completes; a failed smoke is a deploy error per the project -//! rule "deploy blocks on smoke-test". -//! -//! Callers (CLI binaries, the e2e harness, the operator's -//! reconciler) choose which to call. There is no global config knob -//! and no per-Score override — the choice is one function call. - -use harmony::interpret::{InterpretError, Outcome}; -use harmony::inventory::Inventory; -use harmony::score::Score; -use harmony::topology::Topology; -use thiserror::Error; -use tracing::{info, info_span}; - -use super::contract::{SmokeAssemblyError, SmokeTest}; -use super::suite::SmokeReport; - -/// What [`deploy`]/[`deploy_with_smoke`] return on success. -/// -/// `interpret` is the underlying `Score::interpret` result — -/// preserved verbatim so callers that previously consumed `Outcome` -/// directly continue to work after switching to these wrappers. -/// `smoke` is `None` when the no-smoke variant was used; `Some` -/// otherwise, with `passed() == true` guaranteed (a failed smoke is -/// a [`DeployError::SmokeFailed`]). -#[derive(Debug, Clone)] -pub struct DeployOutcome { - pub interpret: Outcome, - pub smoke: Option, -} - -/// All the ways a deploy can fail. Three arms, each pointing at a -/// concrete cause the operator can act on: -/// -/// - `Interpret` — the Score's own work failed (helm error, kube -/// apply error, etc). Same shape callers got before smoke existed. -/// - `SmokeAssembly` — the Score deployed, but its `SmokeTest` -/// couldn't even *build* a suite. Typically means the test's -/// author is missing an endpoint reference; an authoring bug, not -/// a runtime bug. -/// - `SmokeFailed` — the Score deployed and the suite built, but -/// one or more probes failed. The full [`SmokeReport`] is -/// attached so the dashboard can render which stages passed and -/// which didn't, instead of a single opaque "deploy failed". -#[derive(Debug, Error)] -pub enum DeployError { - #[error("score interpret failed: {0}")] - Interpret(#[from] InterpretError), - #[error("smoke assembly failed: {0}")] - SmokeAssembly(#[from] SmokeAssemblyError), - #[error("smoke test failed ({} of {} probe(s) did not pass)", .report.failed().count(), .report.probes.len())] - SmokeFailed { - /// The Score deployed cleanly — `interpret` is preserved so - /// callers (and the dashboard) can render the underlying - /// success message alongside the smoke failure. - interpret: Outcome, - report: SmokeReport, - }, -} - -/// Deploy a Score without running any smoke checks. -/// -/// Returns immediately after `score.interpret` returns. Equivalent -/// to calling `score.interpret(...)` directly — provided here so -/// callers can switch between smoke / no-smoke without changing -/// the surrounding control flow. -pub async fn deploy( - score: S, - inventory: &Inventory, - topology: &T, -) -> Result -where - T: Topology, - S: Score, -{ - let span = info_span!("deploy", score = score.name()); - let _g = span.enter(); - info!("deploy: starting interpret (no smoke)"); - let interpret = score.interpret(inventory, topology).await?; - info!(status = %interpret.status, "deploy: interpret returned"); - Ok(DeployOutcome { - interpret, - smoke: None, - }) -} - -/// Deploy a Score, then run its paired SmokeTest. Block on the -/// suite — see project rule "deploy blocks on smoke-test". -/// -/// The type bound `SM: SmokeTest` is where the -/// compile-time pairing lives: the `SmokeTest` you pass must -/// declare *this* Score type as its associated `Score`. A mismatch -/// is rejected at the call site, not at runtime. (See ADR-024 §2 -/// and JG's compile-time-feedback principle.) -pub async fn deploy_with_smoke( - score: S, - smoke: SM, - inventory: &Inventory, - topology: &T, -) -> Result -where - T: Topology, - S: Score, - SM: SmokeTest, -{ - let span = info_span!("deploy", score = score.name(), smoke = true); - let _g = span.enter(); - - info!("deploy: starting interpret"); - let interpret = score.interpret(inventory, topology).await?; - info!(status = %interpret.status, "deploy: interpret returned, assembling smoke"); - - let suite = smoke.assemble(&score, topology).await?; - info!(stages = suite.len(), "deploy: smoke suite assembled"); - - let report = suite.run(topology).await; - info!( - passed = report.passed(), - elapsed_ms = report.elapsed.as_millis() as u64, - "deploy: smoke finished", - ); - - if !report.passed() { - return Err(DeployError::SmokeFailed { interpret, report }); - } - - Ok(DeployOutcome { - interpret, - smoke: Some(report), - }) -} - -// ---- tests ----------------------------------------------------------------- - -#[cfg(test)] -mod tests { - use super::*; - use crate::companion::smoke::probe::{Probe, ProbeAttempt, ProbeName, RetryPolicy}; - use crate::companion::smoke::suite::SmokeSuite; - use async_trait::async_trait; - use harmony::data::Version; - use harmony::interpret::{Interpret, InterpretName, InterpretStatus}; - use harmony::topology::PreparationOutcome; - use harmony_types::id::Id; - use serde::Serialize; - use std::sync::Mutex; - - // -- test fixtures: a NoopTopology + minimal Score + minimal Probe -------- - - #[derive(Debug)] - struct NoopTopology; - - #[async_trait] - impl Topology for NoopTopology { - fn name(&self) -> &str { - "noop" - } - async fn ensure_ready( - &self, - ) -> Result { - Ok(PreparationOutcome::Noop) - } - } - - /// Minimal Score whose interpret can be scripted to succeed or - /// fail. Demonstrates that `deploy_with_smoke` short-circuits - /// correctly when interpret fails. - #[derive(Debug, Clone, Serialize)] - struct TrivialScore { - label: String, - should_fail: bool, - } - - impl Score for TrivialScore { - fn create_interpret(&self) -> Box> { - Box::new(TrivialInterpret { - should_fail: self.should_fail, - label: self.label.clone(), - }) - } - fn name(&self) -> String { - format!("TrivialScore({})", self.label) - } - } - - #[derive(Debug)] - struct TrivialInterpret { - should_fail: bool, - label: String, - } - - #[async_trait] - impl Interpret for TrivialInterpret { - async fn execute( - &self, - _inventory: &Inventory, - _topology: &T, - ) -> Result { - if self.should_fail { - Err(InterpretError::new(format!( - "scripted failure: {}", - self.label - ))) - } else { - Ok(Outcome::success(format!("deployed {}", self.label))) - } - } - fn get_name(&self) -> InterpretName { - InterpretName::Custom("TrivialInterpret") - } - fn get_version(&self) -> Version { - Version::from("0.0.0").unwrap() - } - fn get_status(&self) -> InterpretStatus { - InterpretStatus::QUEUED - } - fn get_children(&self) -> Vec { - vec![] - } - } - - /// A probe whose every attempt is hardcoded. Lets us script - /// "this stage passes" or "this stage rejects" in tests. - #[derive(Debug)] - struct Always { - name: ProbeName, - attempt: ProbeAttempt, - } - impl Always { - fn new(name: &str, attempt: ProbeAttempt) -> Self { - Self { - name: ProbeName::try_new(name).unwrap(), - attempt, - } - } - } - #[async_trait] - impl Probe for Always { - fn name(&self) -> &ProbeName { - &self.name - } - async fn check(&self, _t: &T) -> ProbeAttempt { - self.attempt.clone() - } - } - - /// The shape we expect every fleet smoke test to follow: it - /// owns no state beyond what the Score gives it. - #[derive(Debug)] - struct TrivialSmoke { - outcomes: Mutex>, - } - impl TrivialSmoke { - fn new(outcomes: Vec) -> Self { - Self { - outcomes: Mutex::new(outcomes), - } - } - } - - #[async_trait] - impl SmokeTest for TrivialSmoke { - type Score = TrivialScore; - async fn assemble( - &self, - score: &Self::Score, - _topology: &T, - ) -> Result, SmokeAssemblyError> { - let mut suite = SmokeSuite::new(); - for (i, attempt) in self.outcomes.lock().unwrap().iter().enumerate() { - let name = format!("{}-probe-{i}", score.label); - suite = suite.stage(Always::new(&name, attempt.clone()), RetryPolicy::once()); - } - Ok(suite) - } - } - - // -- the actual contract tests -------------------------------------------- - - fn inv() -> Inventory { - Inventory::empty() - } - - #[tokio::test] - async fn deploy_without_smoke_returns_outcome_and_no_report() { - let s = TrivialScore { - label: "alpha".to_string(), - should_fail: false, - }; - let r = deploy(s, &inv(), &NoopTopology).await.expect("deploy ok"); - assert_eq!(r.interpret.status, InterpretStatus::SUCCESS); - assert!( - r.smoke.is_none(), - "no-smoke variant must not synthesize a report" - ); - } - - #[tokio::test] - async fn interpret_failure_short_circuits_before_smoke_assembly() { - // If interpret fails, we MUST NOT call `smoke.assemble` — - // there is nothing deployed to verify, and running smoke - // would produce confusing cascade failures in the report. - let s = TrivialScore { - label: "beta".to_string(), - should_fail: true, - }; - // Use a smoke that would panic if assembled — proves we - // never got there. - struct PanicSmoke; - #[async_trait] - impl SmokeTest for PanicSmoke { - type Score = TrivialScore; - async fn assemble( - &self, - _: &Self::Score, - _: &T, - ) -> Result, SmokeAssemblyError> { - panic!("assemble must not be called when interpret fails"); - } - } - match deploy_with_smoke(s, PanicSmoke, &inv(), &NoopTopology).await { - Err(DeployError::Interpret(e)) => { - assert!(format!("{e}").contains("scripted failure")); - } - other => panic!("expected Interpret error, got {other:?}"), - } - } - - #[tokio::test] - async fn all_probes_pass_yields_deployoutcome_with_report() { - let s = TrivialScore { - label: "gamma".to_string(), - should_fail: false, - }; - let smoke = TrivialSmoke::new(vec![ - ProbeAttempt::Ok(Some("first stage ok".to_string())), - ProbeAttempt::Ok(None), - ]); - let r = deploy_with_smoke(s, smoke, &inv(), &NoopTopology) - .await - .expect("deploy + smoke ok"); - let report = r.smoke.expect("smoke report present when smoke runs"); - assert!(report.passed()); - assert_eq!(report.probes.len(), 2); - } - - #[tokio::test] - async fn probe_failure_blocks_deploy() { - let s = TrivialScore { - label: "delta".to_string(), - should_fail: false, - }; - let smoke = TrivialSmoke::new(vec![ - ProbeAttempt::Ok(None), - ProbeAttempt::Fatal("definitely broken".to_string()), - ]); - match deploy_with_smoke(s, smoke, &inv(), &NoopTopology).await { - Err(DeployError::SmokeFailed { interpret, report }) => { - // The Score itself succeeded — that's preserved, so - // the dashboard can show "deploy converged, smoke - // disagrees". - assert_eq!(interpret.status, InterpretStatus::SUCCESS); - assert!(!report.passed()); - assert_eq!(report.failed().count(), 1); - assert_eq!(report.probes.last().unwrap().name.as_str(), "delta-probe-1"); - } - other => panic!("expected SmokeFailed, got {other:?}"), - } - } -} diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/mod.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/mod.rs deleted file mode 100644 index 35bd92dd..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/mod.rs +++ /dev/null @@ -1,62 +0,0 @@ -//! Smoke-test contract — the second Score companion (ADR-023 P7, -//! ADR-023 P4 "deploy returns only after smoke-test success"). -//! -//! ## The big picture -//! -//! ```text -//! ┌────────────┐ pair ┌──────────────┐ assemble ┌────────────┐ -//! │ *Score │◀─type─────│ SmokeTest │─────────────▶│ SmokeSuite │ -//! └────────────┘ locked └──────────────┘ └────────────┘ -//! ▲ │ -//! │ │ -//! deploy_with_smoke ─── interpret ─── then ── run probes ──▶ SmokeReport -//! ``` -//! -//! - [`Probe`] is the single-attempt unit, classifying its -//! observation as [`Ok`/`Retry`/`Fatal`](probe::ProbeAttempt). It's -//! also the stage the dashboard renders. -//! - [`SmokeSuite`] is an ordered sequence of probes with per-stage -//! [`RetryPolicy`]. Sequential by design; parallel stages deferred. -//! - [`SmokeTest`] pairs a [`Score`](harmony::score::Score) with a -//! suite by associated type — a `SmokeTest` -//! cannot be passed to `deploy_with_smoke(DnsScore, …)` because the -//! compiler refuses the mismatch. -//! - [`deploy`] / [`deploy_with_smoke`] are free functions that -//! bind a Score's interpret to an optional SmokeTest. Returns -//! only after the smoke run completes — see -//! [`DeployError::SmokeFailed`] for the failure shape. -//! -//! ## Where this lives -//! -//! Inside `harmony-fleet-deploy` for Phase 0, alongside the -//! existing [`AgentObservation`](super::AgentObservation) companion. -//! When a second consumer (PostgreSQL deploy, monitoring stack) -//! adopts this contract, the module promotes to a top-level -//! `harmony-smoke` crate with no API churn — the move is mechanical -//! because nothing in here depends on fleet types. -//! -//! ## What this does NOT do -//! -//! - **No new `HarmonyEvent` variants in Phase 0.** Stage progress -//! is emitted via `tracing::info!` events inside `info_span!`s. -//! A later PR can add `HarmonyEvent::SmokeStage{Started,Finished}` -//! if the dashboard wants the typed seam. -//! - **No `Score`/`Interpret` trait edits.** Per ADR-023 P7. The -//! smoke seam is purely companion-shaped. -//! - **No CLI flag yet.** Phase 1 wires `harmony-fleet-deploy -//! --no-smoke` once a real `FleetOperatorSmokeTest` exists. - -pub mod contract; -pub mod deploy; -pub mod probe; -pub mod probes; -pub mod suite; - -pub use contract::{SmokeAssemblyError, SmokeTest}; -pub use deploy::{DeployError, DeployOutcome, deploy, deploy_with_smoke}; -pub use probe::{ - InvalidProbeName, Probe, ProbeAttempt, ProbeFailure, ProbeName, ProbeOutcome, RetryPolicy, - run_probe, -}; -pub use probes::TcpReachable; -pub use suite::{ProbeReport, SmokeReport, SmokeSuite}; diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/probe.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/probe.rs deleted file mode 100644 index 2a4a0e5e..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/probe.rs +++ /dev/null @@ -1,515 +0,0 @@ -//! Probe contract — the single-attempt unit of a smoke run. -//! -//! A [`Probe`] is what gets visualized as one stage of the smoke -//! pipeline. Each implementation answers one question against a -//! deployed system: "is this port reachable", "did this pod become -//! ready", "is this KV key set to the value we expect". -//! -//! The retry loop is **not** a probe responsibility. A probe's -//! [`check`](Probe::check) returns one of three classifications via -//! [`ProbeAttempt`] — `Ok`, `Retry`, or `Fatal` — and [`run_probe`] -//! drives the polling timer + timeout budget around it. This keeps -//! probes single-responsibility and makes their unit tests -//! deterministic. -//! -//! Each public type here is cardinality-matched to the concept it -//! represents (see ADR-024 §2 and JG's *For the Love of Compilers* -//! talk). `ProbeName` is a validated newtype, not `String`, because -//! "an empty probe name" or "a probe name with a newline in it" -//! cannot mean anything useful and would corrupt the dashboard -//! rendering. `ProbeAttempt` is a three-arm sum, not `Result<(), Error>`, -//! because "not ready yet" and "definitively no" must drive different -//! orchestration paths. - -use std::fmt; -use std::time::{Duration, Instant}; - -use async_trait::async_trait; -use thiserror::Error; -use tracing::{debug, warn}; - -use harmony::topology::Topology; - -/// One attempted check by a probe. -/// -/// The probe classifies its own observation rather than letting the -/// orchestrator guess: a 5xx response is fundamentally different from -/// a connection refused, even if both look like "failure" to a naive -/// caller. The orchestrator ([`run_probe`]) uses this classification -/// to decide whether to keep polling or bail out. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ProbeAttempt { - /// The probe's invariant holds *right now*. Optional detail is - /// surfaced in the smoke report (e.g. "HTTP 200 in 12 ms", - /// "found 3 ready pods"). - Ok(Option), - /// The probe didn't observe what it's looking for yet, but the - /// state is consistent with eventual success. Keep polling until - /// the timeout. The string is the most recent observation, kept - /// so that a final timeout can quote the last thing we saw. - Retry(String), - /// The probe got a definitive negative answer — no amount of - /// further polling will change it. Skip the rest of the budget - /// and report failure now. Example: HTTP 404 from a URL that - /// must exist after deploy, or a `Service` with the wrong - /// selector. - Fatal(String), -} - -/// Result of a full probe run after [`run_probe`] has consumed the -/// retry budget (or short-circuited). -/// -/// This is the per-probe shape that lands in -/// [`crate::companion::smoke::SmokeReport`]. The orchestrator wraps -/// it with the [`ProbeName`] and the elapsed time so the dashboard -/// can render each stage independently. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ProbeOutcome { - /// At least one attempt returned [`ProbeAttempt::Ok`] within the - /// retry budget. - Pass { - elapsed: Duration, - attempts: u32, - detail: Option, - }, - /// Either we exhausted the budget with only `Retry`s (timeout) - /// or a single `Fatal` short-circuited us (rejected). - Fail { - elapsed: Duration, - attempts: u32, - failure: ProbeFailure, - }, -} - -impl ProbeOutcome { - pub fn passed(&self) -> bool { - matches!(self, Self::Pass { .. }) - } -} - -/// Why a probe failed. Two arms, deliberately — they map to two -/// different operator actions: "I waited and nothing happened" needs -/// a longer budget or a stuck-component investigation; "something -/// gave me a hard no" needs a configuration fix. -#[derive(Debug, Clone, PartialEq, Eq, Error)] -pub enum ProbeFailure { - #[error("timed out after {budget:?} (last observation: {last_observation})")] - Timeout { - budget: Duration, - last_observation: String, - }, - #[error("rejected: {detail}")] - Rejected { detail: String }, -} - -/// Polling parameters for one probe. -/// -/// `interval` is the wait between attempts. `timeout` is the total -/// wall-clock budget from the first attempt to the last; the loop -/// will not start a new attempt past this budget but will let an -/// in-flight one finish. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct RetryPolicy { - interval: Duration, - timeout: Duration, -} - -impl RetryPolicy { - /// Standard polling shape: try, wait `interval`, try again, until - /// either an `Ok`/`Fatal` happens or `timeout` elapses. - /// - /// Panics if `interval` is zero — a zero interval combined with - /// any non-trivial check would spin the executor and produce no - /// useful retry semantics. Express "run once" via [`Self::once`] - /// instead. - pub fn polling(interval: Duration, timeout: Duration) -> Self { - assert!( - !interval.is_zero(), - "RetryPolicy::polling requires a non-zero interval; use RetryPolicy::once for a single attempt", - ); - Self { interval, timeout } - } - - /// Single-shot: one attempt, no retries. Useful for probes whose - /// success criterion is binary the moment we check (e.g. "is - /// this string equal to that string"). - pub fn once() -> Self { - Self { - interval: Duration::from_millis(1), - timeout: Duration::ZERO, - } - } - - pub fn interval(&self) -> Duration { - self.interval - } - - pub fn timeout(&self) -> Duration { - self.timeout - } -} - -/// A validated probe name. -/// -/// Probe names appear in `tracing` events, error messages, the smoke -/// report, and the eventual dashboard. Constraining them at -/// construction means the dashboard never has to defend against -/// surprising bytes (control chars, newlines, etc). -#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct ProbeName(String); - -#[derive(Debug, Error, PartialEq, Eq)] -pub enum InvalidProbeName { - #[error("probe name must not be empty")] - Empty, - #[error("probe name must not exceed 128 bytes (got {0})")] - TooLong(usize), - #[error("probe name must not contain control characters")] - ControlChar, -} - -impl ProbeName { - pub fn try_new(s: impl Into) -> Result { - let s = s.into(); - if s.is_empty() { - return Err(InvalidProbeName::Empty); - } - if s.len() > 128 { - return Err(InvalidProbeName::TooLong(s.len())); - } - if s.chars().any(|c| c.is_control()) { - return Err(InvalidProbeName::ControlChar); - } - Ok(Self(s)) - } - - pub fn as_str(&self) -> &str { - &self.0 - } -} - -impl fmt::Display for ProbeName { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(&self.0) - } -} - -/// One pipeline stage of a [`SmokeSuite`](super::SmokeSuite). -/// -/// Object-safe so a suite can hold a heterogeneous `Vec>>`. The associated retry behavior is supplied separately -/// by the suite (via [`RetryPolicy`]) so that the same probe type -/// can be reused with different budgets in different suites. -#[async_trait] -pub trait Probe: Send + Sync + std::fmt::Debug { - fn name(&self) -> &ProbeName; - async fn check(&self, topology: &T) -> ProbeAttempt; -} - -/// Drive a probe through its retry policy and return the consolidated -/// outcome. -/// -/// Behavior: -/// -/// - Every `interval` we run `probe.check(...)`. A successful attempt -/// wins immediately; a `Fatal` short-circuits to `Rejected`; a -/// `Retry` records its observation and we wait. -/// - If `timeout` is `Duration::ZERO` we run exactly one attempt -/// (the "once" shape). -/// - Otherwise we keep attempting until the elapsed wall time -/// exceeds `timeout`. The last `Retry` observation becomes the -/// `Timeout::last_observation` field so the failure message tells -/// the operator what we kept seeing. -/// -/// The function emits a `tracing` event per attempt at `debug` level -/// and one at `warn` for the final failure (so a Phase-1 dashboard -/// can subscribe via a `tracing` layer without us needing to add a -/// new `HarmonyEvent` variant yet). -pub async fn run_probe(probe: &P, topology: &T, policy: RetryPolicy) -> ProbeOutcome -where - T: Topology, - P: Probe + ?Sized, -{ - let start = Instant::now(); - let mut attempts: u32 = 0; - - loop { - attempts += 1; - // Each iteration produces either an early return (Ok / Fatal) - // or the most recent Retry observation. Threading that - // observation back into the next iteration only happens - // implicitly via the timeout check below, so there's no - // long-lived `last_observation` to lint over. - let last_observation = match probe.check(topology).await { - ProbeAttempt::Ok(detail) => { - debug!( - probe = %probe.name(), - attempts, - elapsed_ms = start.elapsed().as_millis() as u64, - "probe passed", - ); - return ProbeOutcome::Pass { - elapsed: start.elapsed(), - attempts, - detail, - }; - } - ProbeAttempt::Fatal(detail) => { - warn!( - probe = %probe.name(), - attempts, - %detail, - "probe rejected (fatal)", - ); - return ProbeOutcome::Fail { - elapsed: start.elapsed(), - attempts, - failure: ProbeFailure::Rejected { detail }, - }; - } - ProbeAttempt::Retry(obs) => { - debug!( - probe = %probe.name(), - attempts, - observation = %obs, - "probe retry", - ); - obs - } - }; - - if start.elapsed() >= policy.timeout { - warn!( - probe = %probe.name(), - attempts, - budget_ms = policy.timeout.as_millis() as u64, - last_observation = %last_observation, - "probe timed out", - ); - return ProbeOutcome::Fail { - elapsed: start.elapsed(), - attempts, - failure: ProbeFailure::Timeout { - budget: policy.timeout, - last_observation, - }, - }; - } - - tokio::time::sleep(policy.interval).await; - } -} - -#[cfg(test)] -mod tests { - use super::*; - use harmony::topology::PreparationOutcome; - use std::sync::Mutex; - use std::sync::atomic::{AtomicU32, Ordering}; - - // -- ProbeName ----------------------------------------------------------- - - #[test] - fn probe_name_accepts_normal_strings() { - assert!(ProbeName::try_new("nats-reachable").is_ok()); - assert!(ProbeName::try_new("HTTP / health").is_ok()); - } - - #[test] - fn probe_name_rejects_empty() { - assert_eq!(ProbeName::try_new(""), Err(InvalidProbeName::Empty)); - } - - #[test] - fn probe_name_rejects_control_chars() { - // Dashboard renders these as garbage — fail at construction. - assert_eq!( - ProbeName::try_new("bad\nname"), - Err(InvalidProbeName::ControlChar) - ); - } - - #[test] - fn probe_name_rejects_overlong() { - let long = "x".repeat(129); - assert_eq!( - ProbeName::try_new(long.clone()), - Err(InvalidProbeName::TooLong(129)) - ); - } - - // -- RetryPolicy --------------------------------------------------------- - - #[test] - fn retry_policy_once_is_single_attempt() { - let p = RetryPolicy::once(); - assert_eq!(p.timeout(), Duration::ZERO); - } - - #[test] - #[should_panic(expected = "non-zero interval")] - fn retry_policy_polling_rejects_zero_interval() { - // Zero interval + non-trivial check is a spin loop. The - // type system can't catch this, so a runtime assert at - // construction is the next best thing — fail loud, fail - // early, not on the operator's deploy log. - let _ = RetryPolicy::polling(Duration::ZERO, Duration::from_secs(5)); - } - - // -- run_probe orchestration -------------------------------------------- - - /// Test probe that returns a scripted sequence of attempts, then - /// loops forever on the last one. Lets us simulate - /// "retry-then-ok" without real network I/O. - #[derive(Debug)] - struct ScriptedProbe { - name: ProbeName, - script: Mutex>, - calls: AtomicU32, - } - - impl ScriptedProbe { - fn new(name: &str, script: Vec) -> Self { - Self { - name: ProbeName::try_new(name).unwrap(), - script: Mutex::new(script), - calls: AtomicU32::new(0), - } - } - } - - #[async_trait] - impl Probe for ScriptedProbe { - fn name(&self) -> &ProbeName { - &self.name - } - async fn check(&self, _t: &T) -> ProbeAttempt { - self.calls.fetch_add(1, Ordering::SeqCst); - let mut s = self.script.lock().unwrap(); - if s.len() == 1 { - s[0].clone() - } else { - s.remove(0) - } - } - } - - #[derive(Debug)] - struct NoopTopology; - - #[async_trait] - impl Topology for NoopTopology { - fn name(&self) -> &str { - "noop" - } - async fn ensure_ready( - &self, - ) -> Result { - Ok(PreparationOutcome::Noop) - } - } - - #[tokio::test] - async fn ok_on_first_attempt_yields_pass() { - let probe = ScriptedProbe::new("instant", vec![ProbeAttempt::Ok(Some("hi".to_string()))]); - let outcome = run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(1), Duration::from_millis(50)), - ) - .await; - match outcome { - ProbeOutcome::Pass { - attempts, detail, .. - } => { - assert_eq!(attempts, 1); - assert_eq!(detail.as_deref(), Some("hi")); - } - other => panic!("expected Pass, got {other:?}"), - } - } - - #[tokio::test] - async fn retries_until_ok_within_budget() { - let probe = ScriptedProbe::new( - "eventual", - vec![ - ProbeAttempt::Retry("not yet (1)".to_string()), - ProbeAttempt::Retry("not yet (2)".to_string()), - ProbeAttempt::Ok(None), - ], - ); - let outcome = run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(2), Duration::from_millis(500)), - ) - .await; - match outcome { - ProbeOutcome::Pass { attempts, .. } => assert_eq!(attempts, 3), - other => panic!("expected Pass after retries, got {other:?}"), - } - } - - #[tokio::test] - async fn fatal_short_circuits_without_consuming_full_budget() { - let probe = ScriptedProbe::new( - "definite-no", - vec![ProbeAttempt::Fatal("wrong selector".to_string())], - ); - let start = Instant::now(); - let outcome = run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(5), Duration::from_secs(10)), - ) - .await; - // The whole point of Fatal is that we don't burn the budget. - // Generous upper bound — CI variance, not a real check. - assert!( - start.elapsed() < Duration::from_secs(1), - "Fatal should return immediately, took {:?}", - start.elapsed() - ); - match outcome { - ProbeOutcome::Fail { - failure: ProbeFailure::Rejected { detail }, - .. - } => assert_eq!(detail, "wrong selector"), - other => panic!("expected Rejected, got {other:?}"), - } - } - - #[tokio::test] - async fn always_retry_yields_timeout_with_last_observation() { - let probe = ScriptedProbe::new( - "never-ready", - vec![ProbeAttempt::Retry("still pending".to_string())], - ); - let outcome = run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(5), Duration::from_millis(40)), - ) - .await; - match outcome { - ProbeOutcome::Fail { - failure: - ProbeFailure::Timeout { - budget, - last_observation, - }, - attempts, - .. - } => { - assert_eq!(budget, Duration::from_millis(40)); - assert_eq!(last_observation, "still pending"); - assert!( - attempts >= 2, - "expected at least two attempts, got {attempts}" - ); - } - other => panic!("expected Timeout, got {other:?}"), - } - } -} diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/probes/mod.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/probes/mod.rs deleted file mode 100644 index 8fe2e1a9..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/probes/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -//! First-party [`Probe`](super::Probe) implementations. -//! -//! Phase 0 ships exactly one — [`TcpReachable`] — to validate the -//! trait shape against real network I/O. Phase 1 adds HTTP, K8s -//! pod-ready, NATS KV. Each future probe is one self-contained -//! file under this module. -//! -//! Probes that need their own crate dependencies (e.g. a future -//! `HelmReleaseHealthy` that talks to the helm client) can be -//! gated behind a Cargo feature here without touching the core -//! smoke types. - -pub mod tcp; - -pub use tcp::TcpReachable; diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/probes/tcp.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/probes/tcp.rs deleted file mode 100644 index ce1dc7bc..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/probes/tcp.rs +++ /dev/null @@ -1,225 +0,0 @@ -//! `TcpReachable` — the smallest useful probe: open a TCP connection -//! to `host:port` within a timeout. -//! -//! Used as the first stage of practically every fleet smoke suite: -//! "before checking the operator's `/healthz`, prove the cluster IP -//! even routes". A connect-refused is a `Retry` (we may have caught -//! the service mid-startup); a DNS or address-syntax error is -//! `Fatal` (no amount of polling will fix it). -//! -//! Implemented with `tokio::net::TcpStream`. No new crate -//! dependencies — `tokio` is already pulled in with the `full` -//! feature by this crate's `Cargo.toml`. - -use std::io; -use std::time::Duration; - -use async_trait::async_trait; -use tokio::net::TcpStream; - -use harmony::topology::Topology; - -use crate::companion::smoke::probe::{Probe, ProbeAttempt, ProbeName}; - -/// Probe that connects to `host:port` and immediately drops the -/// socket. Pass on `connect succeeded`. -#[derive(Debug, Clone)] -pub struct TcpReachable { - name: ProbeName, - address: String, - connect_timeout: Duration, -} - -impl TcpReachable { - /// Default connect timeout is 1s — short enough that a polling - /// retry sees several attempts; long enough that a slow host - /// doesn't false-fail. Override with - /// [`Self::with_connect_timeout`]. - pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(1); - - pub fn new(name: ProbeName, address: impl Into) -> Self { - Self { - name, - address: address.into(), - connect_timeout: Self::DEFAULT_CONNECT_TIMEOUT, - } - } - - pub fn with_connect_timeout(mut self, t: Duration) -> Self { - self.connect_timeout = t; - self - } - - pub fn address(&self) -> &str { - &self.address - } -} - -#[async_trait] -impl Probe for TcpReachable { - fn name(&self) -> &ProbeName { - &self.name - } - - async fn check(&self, _topology: &T) -> ProbeAttempt { - let connect = TcpStream::connect(self.address.as_str()); - match tokio::time::timeout(self.connect_timeout, connect).await { - Ok(Ok(_stream)) => ProbeAttempt::Ok(Some(format!("connected to {}", self.address))), - Ok(Err(e)) => classify_connect_error(&self.address, e), - Err(_elapsed) => ProbeAttempt::Retry(format!( - "connect to {} timed out after {:?}", - self.address, self.connect_timeout - )), - } - } -} - -/// Most connect errors look like "not ready yet" — we want to keep -/// polling. A small set is definitive and should short-circuit the -/// retry budget. `InvalidInput` and `Unsupported` both mean the -/// address is unparseable / unroutable on this host — no number of -/// retries will fix that. -fn classify_connect_error(address: &str, e: io::Error) -> ProbeAttempt { - match e.kind() { - io::ErrorKind::InvalidInput => { - ProbeAttempt::Fatal(format!("invalid address {address}: {e}")) - } - io::ErrorKind::Unsupported => { - ProbeAttempt::Fatal(format!("unsupported address {address}: {e}")) - } - // ConnectionRefused / ConnectionReset / TimedOut / - // HostUnreachable / NetworkUnreachable / NotFound (rare) — - // all consistent with "service not up yet". Keep polling. - _ => ProbeAttempt::Retry(format!("connect to {address}: {e}")), - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::companion::smoke::probe::{ProbeFailure, ProbeOutcome, RetryPolicy, run_probe}; - use async_trait::async_trait; - use harmony::topology::PreparationOutcome; - use tokio::net::TcpListener; - - #[derive(Debug)] - struct NoopTopology; - - #[async_trait] - impl Topology for NoopTopology { - fn name(&self) -> &str { - "noop" - } - async fn ensure_ready( - &self, - ) -> Result { - Ok(PreparationOutcome::Noop) - } - } - - fn name(s: &str) -> ProbeName { - ProbeName::try_new(s).unwrap() - } - - #[tokio::test] - async fn passes_against_a_bound_listener() { - // Bind to a kernel-assigned port and prove the probe sees - // the listener. Using 127.0.0.1 keeps this hermetic — no - // outbound network, no flakiness from name resolution. - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - let probe = TcpReachable::new(name("local-listener"), addr.to_string()); - match run_probe(&probe, &NoopTopology, RetryPolicy::once()).await { - ProbeOutcome::Pass { attempts, .. } => assert_eq!(attempts, 1), - other => panic!("expected pass, got {other:?}"), - } - } - - #[tokio::test] - async fn retries_then_passes_when_listener_appears_late() { - // Pick a free port, then race: probe runs immediately, then - // we bind 30 ms later. The polling retry should catch it. - let pre = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = pre.local_addr().unwrap(); - drop(pre); // free the port - - let probe = TcpReachable::new(name("late-listener"), addr.to_string()) - .with_connect_timeout(Duration::from_millis(50)); - - let (probe_result, _) = tokio::join!( - run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(10), Duration::from_secs(2)), - ), - async { - tokio::time::sleep(Duration::from_millis(40)).await; - let listener = TcpListener::bind(addr).await.expect("rebind"); - // Hold the listener until the probe is satisfied. - tokio::time::sleep(Duration::from_millis(500)).await; - drop(listener); - } - ); - - match probe_result { - ProbeOutcome::Pass { attempts, .. } => { - assert!( - attempts >= 2, - "expected at least 2 attempts, got {attempts}" - ); - } - other => panic!("expected eventual pass, got {other:?}"), - } - } - - #[tokio::test] - async fn times_out_on_unused_port() { - // Bind+drop to confirm the port is free in the kernel's - // sense, then probe against it with a tight budget. With - // nothing listening, the OS returns ECONNREFUSED, which we - // classify as Retry → budget expiry → Timeout. - let pre = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = pre.local_addr().unwrap(); - drop(pre); - - let probe = TcpReachable::new(name("nothing-here"), addr.to_string()) - .with_connect_timeout(Duration::from_millis(50)); - let outcome = run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(20), Duration::from_millis(100)), - ) - .await; - match outcome { - ProbeOutcome::Fail { - failure: ProbeFailure::Timeout { .. }, - .. - } => {} - other => panic!("expected Timeout, got {other:?}"), - } - } - - #[tokio::test] - async fn fatal_on_unparseable_address() { - // Connect to a syntactically invalid address. No amount of - // retrying will help; we should short-circuit to Fatal. - let probe = TcpReachable::new(name("bad-addr"), "not-an-address"); - let outcome = run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(20), Duration::from_secs(5)), - ) - .await; - match outcome { - ProbeOutcome::Fail { - failure: ProbeFailure::Rejected { detail }, - attempts, - .. - } => { - assert!(detail.contains("not-an-address")); - assert_eq!(attempts, 1, "Fatal must short-circuit on attempt 1"); - } - other => panic!("expected Rejected, got {other:?}"), - } - } -} diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/suite.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/suite.rs deleted file mode 100644 index a4de34fa..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/suite.rs +++ /dev/null @@ -1,287 +0,0 @@ -//! Smoke suite — an ordered sequence of [`Probe`]s with per-probe -//! retry budgets, and the report that comes back from running it. -//! -//! Suites are sequential by design in Phase 0. The mental model is -//! "first prove the basic connectivity, then prove the derived -//! state" — which is inherently ordered. Parallel stages are a -//! deferred extension; the public API never exposed sequentiality -//! as a guarantee, so adding `parallel_stage` later is additive. -//! -//! A [`SmokeReport`] is a pure value — easy to log, easy to -//! serialize for the dashboard, easy to assert against in tests. It -//! deliberately mirrors the -//! [`Outcome`](harmony::interpret::Outcome)/[`InterpretError`](harmony::interpret::InterpretError) -//! split that the rest of the framework uses: one type per "what -//! happened", not a Result soup. - -use std::time::{Duration, Instant}; - -use harmony::topology::Topology; -use tracing::{info, info_span}; - -use super::probe::{Probe, ProbeName, ProbeOutcome, RetryPolicy, run_probe}; - -/// One stage in a [`SmokeSuite`]: a probe plus the retry budget that -/// applies to *this* invocation. -struct SmokeStage { - probe: Box>, - policy: RetryPolicy, -} - -/// An ordered, retry-aware composition of probes for one -/// [`SmokeTest`](super::SmokeTest). -/// -/// Built via [`SmokeSuite::new`] + [`SmokeSuite::stage`]. Run via -/// [`SmokeSuite::run`]. The same probe type can appear multiple -/// times with different policies — that's the point of supplying -/// the policy at the stage level, not on the probe itself. -pub struct SmokeSuite { - stages: Vec>, -} - -impl Default for SmokeSuite { - fn default() -> Self { - Self::new() - } -} - -impl std::fmt::Debug for SmokeSuite { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SmokeSuite") - .field( - "stages", - &self - .stages - .iter() - .map(|s| s.probe.name().as_str()) - .collect::>(), - ) - .finish() - } -} - -impl SmokeSuite { - pub fn new() -> Self { - Self { stages: Vec::new() } - } - - /// Append a stage. Builder-style so an `assemble` method reads - /// like a checklist: - /// - /// ```ignore - /// SmokeSuite::new() - /// .stage(TcpReachable::new(name, "nats:4222"), RetryPolicy::polling(...)) - /// .stage(HttpHealthy::new(name, "/healthz"), RetryPolicy::polling(...)) - /// ``` - pub fn stage

(mut self, probe: P, policy: RetryPolicy) -> Self - where - P: Probe + 'static, - { - self.stages.push(SmokeStage { - probe: Box::new(probe) as Box>, - policy, - }); - self - } - - pub fn len(&self) -> usize { - self.stages.len() - } - - pub fn is_empty(&self) -> bool { - self.stages.is_empty() - } - - /// Drive every stage in declared order. Stops on the first - /// failure — a smoke test is a chain of preconditions, so - /// continuing past a failure rarely produces useful information - /// and risks flooding the dashboard with cascade failures. - pub async fn run(self, topology: &T) -> SmokeReport { - let span = info_span!("smoke_suite", probes = self.stages.len()); - let _guard = span.enter(); - - let start = Instant::now(); - let mut probes = Vec::with_capacity(self.stages.len()); - - for stage in self.stages { - let name = stage.probe.name().clone(); - info!(probe = %name, "smoke stage started"); - let outcome = run_probe(&*stage.probe, topology, stage.policy).await; - let passed = outcome.passed(); - info!( - probe = %name, - passed, - elapsed_ms = elapsed_ms(&outcome), - "smoke stage finished", - ); - probes.push(ProbeReport { name, outcome }); - if !passed { - break; - } - } - - SmokeReport { - probes, - elapsed: start.elapsed(), - } - } -} - -fn elapsed_ms(outcome: &ProbeOutcome) -> u64 { - let d = match outcome { - ProbeOutcome::Pass { elapsed, .. } | ProbeOutcome::Fail { elapsed, .. } => *elapsed, - }; - d.as_millis() as u64 -} - -/// What one probe stage produced. The `name` is repeated here (also -/// owned by the probe itself) so a `SmokeReport` is fully -/// self-contained — the probe `Box` is dropped after the suite runs. -#[derive(Debug, Clone)] -pub struct ProbeReport { - pub name: ProbeName, - pub outcome: ProbeOutcome, -} - -impl ProbeReport { - pub fn passed(&self) -> bool { - self.outcome.passed() - } -} - -/// What [`SmokeSuite::run`] returns. The dashboard reads probes top -/// to bottom; the orchestrator reads `passed()` to gate the deploy. -#[derive(Debug, Clone)] -pub struct SmokeReport { - pub probes: Vec, - pub elapsed: Duration, -} - -impl SmokeReport { - /// `true` iff every probe in the suite passed. - /// - /// An empty suite passes — there's nothing to fail. Callers who - /// want "a smoke test ran" semantics should check `!is_empty()` - /// in addition. - pub fn passed(&self) -> bool { - self.probes.iter().all(ProbeReport::passed) - } - - pub fn is_empty(&self) -> bool { - self.probes.is_empty() - } - - /// Iterator over only the failed probes — convenient for - /// rendering "X failures of Y" without filtering at the call - /// site. - pub fn failed(&self) -> impl Iterator { - self.probes.iter().filter(|p| !p.passed()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::companion::smoke::probe::{ProbeAttempt, ProbeFailure}; - use async_trait::async_trait; - use harmony::topology::PreparationOutcome; - - #[derive(Debug)] - struct NoopTopology; - - #[async_trait] - impl Topology for NoopTopology { - fn name(&self) -> &str { - "noop" - } - async fn ensure_ready( - &self, - ) -> Result { - Ok(PreparationOutcome::Noop) - } - } - - #[derive(Debug)] - struct Always { - name: ProbeName, - attempt: ProbeAttempt, - } - - impl Always { - fn new(name: &str, attempt: ProbeAttempt) -> Self { - Self { - name: ProbeName::try_new(name).unwrap(), - attempt, - } - } - } - - #[async_trait] - impl Probe for Always { - fn name(&self) -> &ProbeName { - &self.name - } - async fn check(&self, _t: &T) -> ProbeAttempt { - self.attempt.clone() - } - } - - #[tokio::test] - async fn empty_suite_passes_vacuously() { - let report = SmokeSuite::::new().run(&NoopTopology).await; - assert!(report.passed()); - assert!(report.is_empty()); - } - - #[tokio::test] - async fn all_pass_stages_aggregates_pass() { - let report = SmokeSuite::::new() - .stage( - Always::new("a", ProbeAttempt::Ok(None)), - RetryPolicy::once(), - ) - .stage( - Always::new("b", ProbeAttempt::Ok(Some("ok".to_string()))), - RetryPolicy::once(), - ) - .run(&NoopTopology) - .await; - assert!(report.passed()); - assert_eq!(report.probes.len(), 2); - assert!(report.failed().count() == 0); - } - - #[tokio::test] - async fn first_failure_short_circuits_remaining_stages() { - // The third stage must never run. We assert by name — - // its presence in the report would prove a regression. - let report = SmokeSuite::::new() - .stage( - Always::new("a", ProbeAttempt::Ok(None)), - RetryPolicy::once(), - ) - .stage( - Always::new("b", ProbeAttempt::Fatal("nope".to_string())), - RetryPolicy::once(), - ) - .stage( - Always::new("c", ProbeAttempt::Ok(None)), - RetryPolicy::once(), - ) - .run(&NoopTopology) - .await; - assert!(!report.passed()); - assert_eq!(report.probes.len(), 2); - assert_eq!(report.probes[0].name.as_str(), "a"); - assert_eq!(report.probes[1].name.as_str(), "b"); - // Cascading failures are noise — confirm only the actual - // culprit appears. - match &report.probes[1].outcome { - ProbeOutcome::Fail { - failure: ProbeFailure::Rejected { detail }, - .. - } => assert_eq!(detail, "nope"), - other => panic!("expected Rejected, got {other:?}"), - } - } -} diff --git a/fleet/harmony-fleet-deploy/src/lib.rs b/fleet/harmony-fleet-deploy/src/lib.rs index 8499c804..3afbd654 100644 --- a/fleet/harmony-fleet-deploy/src/lib.rs +++ b/fleet/harmony-fleet-deploy/src/lib.rs @@ -25,6 +25,13 @@ //! companion seam from P7. First-party `TcpReachable` probe ships //! in this PR; HTTP/K8s/NATS probes and a real //! `FleetOperatorSmokeTest` follow in Phase 1. +//! - [`companion::logs`] — the log-tail contract. `LogQuery` +//! companion trait, `LogChunk` value, `PodmanLogQuery` NATS +//! request/reply implementation. Powers the dashboard's "View +//! logs" UX. Trait + transport-side impl + unit tests ship in +//! this PR; the agent-side `Verb::Logs` handler and the operator +//! dashboard handler (`/deployments//devices//logs`) +//! land in follow-up PRs against the contract locked here. //! //! Out of scope for now: //! @@ -42,6 +49,7 @@ pub mod server; pub use agent::{FleetAgentScore, PodTarget}; pub use companion::AgentObservation; +pub use companion::logs; pub use companion::smoke; pub use nats::{FleetNatsScore, UserPassCredentials}; pub use operator::{FleetOperatorScore, OperatorCredentials}; diff --git a/harmony-reconciler-contracts/src/commands.rs b/harmony-reconciler-contracts/src/commands.rs index 9d7d448b..e4adeb9e 100644 --- a/harmony-reconciler-contracts/src/commands.rs +++ b/harmony-reconciler-contracts/src/commands.rs @@ -15,6 +15,7 @@ //! messages on the inbox (streaming verbs set the `X-Harmony-Final` //! header on the last frame). +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use harmony_types::id::Id; @@ -42,6 +43,10 @@ pub const SUBJECT_PREFIX: &str = "device-commands"; #[serde(rename_all = "lowercase")] pub enum Verb { Ping, + /// Fetch the last N lines of a deployment's container logs from + /// the device. Single-shot reply; live tail (streaming) is a + /// later verb. + Logs, } impl Verb { @@ -51,6 +56,7 @@ impl Verb { pub fn as_subject_token(&self) -> &'static str { match self { Verb::Ping => "ping", + Verb::Logs => "logs", } } } @@ -67,12 +73,14 @@ pub fn device_command_subscription(device_id: &str) -> String { format!("{SUBJECT_PREFIX}.{device_id}.>") } -/// JSON body of an outbound command request. v1 carries only `Ping` -/// (which has no payload). Future verbs add their own struct + variant. +/// JSON body of an outbound command request. v1 carried only `Ping`; +/// `Logs` is the second verb. Future verbs add their own struct + +/// variant. #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "verb", rename_all = "lowercase")] pub enum CommandRequest { Ping, + Logs(LogsRequest), } /// JSON body of a `Verb::Ping` reply. @@ -83,6 +91,55 @@ pub struct PingReply { pub uptime_s: u64, } +/// JSON body of a `Verb::Logs` request. +/// +/// The deployment lives in the body (not in the subject) so the +/// agent's existing wildcard subscription +/// `device-commands..>` keeps working unchanged — the verb stays +/// the trailing token. `lines` is bounded at deserialization on the +/// agent side; values above [`LOGS_MAX_LINES`] are clamped down so a +/// hostile operator can't ask for an unbounded transfer. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct LogsRequest { + /// Which deployment on the target device to read logs from. + /// Validated by the typed [`crate::DeploymentName`] newtype. + pub deployment: crate::DeploymentName, + /// How many trailing lines to return. The agent clamps to + /// [`LOGS_MAX_LINES`] before invoking the runtime. + pub lines: u32, +} + +/// JSON body of a `Verb::Logs` reply. Mirrors +/// `harmony_fleet_deploy::companion::logs::LogChunk` on the wire — +/// kept here (not in the deploy crate) so the agent build, which +/// must not depend on harmony, can construct and serialize it. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct LogsReply { + /// `ProbeName`-equivalent identifier for the log stream. The deploy + /// crate wraps this back into a validated `ProbeName` on receipt. + /// On the wire it's a `String` so the agent crate stays free of + /// the smoke-companion's validated-newtype surface. + pub source: String, + /// When the agent captured the logs. The dashboard renders this + /// alongside the lines so the user knows the chunk is fresh + /// (not a stale cache). + pub captured_at: DateTime, + /// Tail lines, oldest-first, one container line per entry. Empty + /// when the container has produced no output yet. + pub lines: Vec, + /// `true` when the agent's runtime returned a stream that + /// exceeded the per-request size budget and we cut it. Surfacing + /// this lets the dashboard show a "showing last N of more" + /// hint instead of silently truncating. + pub truncated: bool, +} + +/// Hard upper bound on `LogsRequest::lines`. The agent clamps +/// requests above this. 1000 lines comfortably fits in a single +/// NATS message under the default 8 MiB max payload and matches the +/// `--tail` budget a typical operator wants from the dashboard. +pub const LOGS_MAX_LINES: u32 = 1000; + /// Stable error categories the agent reports on the reply payload /// when a verb can't be handled. The operator-side client maps these /// to its own typed error enum; everything else (no_responders, @@ -155,4 +212,68 @@ mod tests { let json = serde_json::to_string(&CommandRequest::Ping).unwrap(); assert_eq!(json, r#"{"verb":"ping"}"#); } + + #[test] + fn logs_subject_matches_documented_format() { + // Subject suffix is the verb token only — the deployment + // lives in the request body. Keeps the agent's existing + // wildcard subscription (`device-commands..>`) and + // permission template valid without change. + assert_eq!( + device_command_subject("pi-42", Verb::Logs), + "device-commands.pi-42.logs" + ); + } + + #[test] + fn logs_request_round_trips_through_json() { + let original = LogsRequest { + deployment: crate::DeploymentName::try_new("hello-web").expect("valid"), + lines: 100, + }; + let json = serde_json::to_vec(&original).unwrap(); + let back: LogsRequest = serde_json::from_slice(&json).unwrap(); + assert_eq!(back, original); + } + + #[test] + fn logs_reply_round_trips_through_json() { + let original = LogsReply { + source: "podman:hello-web".to_string(), + captured_at: chrono::DateTime::parse_from_rfc3339("2026-01-15T12:00:00Z") + .unwrap() + .with_timezone(&Utc), + lines: vec!["first".to_string(), "second".to_string()], + truncated: false, + }; + let json = serde_json::to_vec(&original).unwrap(); + let back: LogsReply = serde_json::from_slice(&json).unwrap(); + assert_eq!(back, original); + } + + #[test] + fn command_request_logs_tagged_form_is_stable() { + // Locking the wire shape: an older agent that learns about + // `Logs` must continue to recognize this exact bytes + // sequence as a Logs request. + let req = CommandRequest::Logs(LogsRequest { + deployment: crate::DeploymentName::try_new("hello-web").expect("valid"), + lines: 50, + }); + let json = serde_json::to_string(&req).unwrap(); + assert_eq!( + json, + r#"{"verb":"logs","deployment":"hello-web","lines":50}"# + ); + } + + #[test] + fn logs_max_lines_is_a_sane_budget() { + // The constant exists; assert it's not zero (which would + // make every legal request return no lines) and not so + // huge that a single NATS message would exceed the default + // server payload limit. + assert!(LOGS_MAX_LINES > 0); + assert!(LOGS_MAX_LINES <= 10_000); + } } diff --git a/harmony-reconciler-contracts/src/lib.rs b/harmony-reconciler-contracts/src/lib.rs index e1c6cba2..46ba4cdb 100644 --- a/harmony-reconciler-contracts/src/lib.rs +++ b/harmony-reconciler-contracts/src/lib.rs @@ -23,8 +23,8 @@ pub mod status; pub use commands::{ CommandRequest, ErrorKind, ErrorReply, HDR_DEADLINE, HDR_FINAL, HDR_OPERATOR_SUB, - HDR_REQUEST_ID, PingReply, SUBJECT_PREFIX, Verb, device_command_subject, - device_command_subscription, + HDR_REQUEST_ID, LOGS_MAX_LINES, LogsReply, LogsRequest, PingReply, SUBJECT_PREFIX, Verb, + device_command_subject, device_command_subscription, }; pub use fleet::{ DeploymentName, DeploymentState, DeviceInfo, HeartbeatPayload, InvalidDeploymentName,