feat(fleet-deploy): log-tail contract as a Score companion #295
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -4086,7 +4086,9 @@ name = "harmony-fleet-deploy"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-nats",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"clap",
|
||||
"env_logger",
|
||||
"harmony",
|
||||
|
||||
@@ -25,7 +25,9 @@ harmony-fleet-auth = { path = "../harmony-fleet-auth" }
|
||||
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
|
||||
|
||||
anyhow = { workspace = true }
|
||||
async-nats = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
clap = { workspace = true }
|
||||
k8s-openapi = { workspace = true }
|
||||
kube = { workspace = true, features = ["runtime", "derive"] }
|
||||
|
||||
220
fleet/harmony-fleet-deploy/src/companion/logs/chunk.rs
Normal file
220
fleet/harmony-fleet-deploy/src/companion/logs/chunk.rs
Normal file
@@ -0,0 +1,220 @@
|
||||
//! The value a [`LogQuery`](super::LogQuery) returns: a chunk of log
|
||||
//! lines with the metadata the dashboard needs to render it honestly.
|
||||
//!
|
||||
//! Kept as a pure data type — no transport, no async, no IO. The
|
||||
//! transport layer (the `PodmanLogQuery` impl, the agent's command
|
||||
//! handler) constructs and consumes it; the dashboard renders it.
|
||||
//! That separation makes the trait easy to mock for unit tests and
|
||||
//! easy to evolve (e.g. add `level` per line) without churning every
|
||||
//! call site.
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use thiserror::Error;
|
||||
|
||||
/// Maximum byte length of a [`LogSource`] string. Bounded so a
|
||||
/// misbehaving agent can't OOM the dashboard with a multi-megabyte
|
||||
/// "source" field.
|
||||
pub const LOG_SOURCE_MAX_BYTES: usize = 128;
|
||||
|
||||
/// Stable identifier for the log stream a [`LogChunk`] came from
|
||||
/// (e.g. `podman:my-deployment/web`).
|
||||
///
|
||||
/// **Validated newtype**: the value originates on a NATS reply from
|
||||
/// an untrusted-until-authenticated agent, so we enforce shape
|
||||
/// constraints at the deserialization boundary. Rules:
|
||||
///
|
||||
/// - non-empty
|
||||
/// - no ASCII control characters (`\0`..`\x1f`, `\x7f`)
|
||||
/// - at most [`LOG_SOURCE_MAX_BYTES`] bytes (UTF-8 byte length)
|
||||
///
|
||||
/// A bad value is surfaced via [`LogQueryError::InvalidReply`], not
|
||||
/// folded into a generic decode failure — the dashboard logs the
|
||||
/// rejected string for triage.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub struct LogSource(String);
|
||||
|
||||
impl LogSource {
|
||||
pub fn try_new(name: impl Into<String>) -> Result<Self, InvalidLogSource> {
|
||||
let s = name.into();
|
||||
if s.is_empty() {
|
||||
return Err(InvalidLogSource::Empty);
|
||||
}
|
||||
if s.len() > LOG_SOURCE_MAX_BYTES {
|
||||
return Err(InvalidLogSource::TooLong {
|
||||
bytes: s.len(),
|
||||
max: LOG_SOURCE_MAX_BYTES,
|
||||
});
|
||||
}
|
||||
if let Some(c) = s.chars().find(|c| c.is_control()) {
|
||||
return Err(InvalidLogSource::ControlChar(c));
|
||||
}
|
||||
Ok(LogSource(s))
|
||||
}
|
||||
|
||||
pub fn as_str(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for LogSource {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Error, PartialEq, Eq)]
|
||||
pub enum InvalidLogSource {
|
||||
#[error("log source name must not be empty")]
|
||||
Empty,
|
||||
#[error("log source name is {bytes} bytes; max is {max}")]
|
||||
TooLong { bytes: usize, max: usize },
|
||||
#[error("log source name contains control char {0:?}")]
|
||||
ControlChar(char),
|
||||
}
|
||||
|
||||
/// A bounded chunk of log lines fetched from a device.
|
||||
///
|
||||
/// "Chunk" is deliberate: this isn't "the logs" — it's the last N
|
||||
/// lines, captured at one moment, in their original ordering. The
|
||||
/// dashboard renders the chunk; refreshing fetches a new chunk. v0.4
|
||||
/// will add a streaming variant; for v0.3 the customer's mental
|
||||
/// model is "show me what just happened", which a chunk answers
|
||||
/// directly.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct LogChunk {
|
||||
/// Stable identifier for the log stream this chunk came from
|
||||
/// (e.g. `podman:my-deployment/web`). Validated by [`LogSource`]
|
||||
/// at deserialization time — see that type's docs for the shape
|
||||
/// constraints.
|
||||
pub source: LogSource,
|
||||
/// When the chunk was captured **on the device**, not when the
|
||||
/// operator received it. Surfaces clock skew between operator and
|
||||
/// device — "logs from 12 seconds ago" is honest, "logs as of
|
||||
/// now" would be a lie.
|
||||
pub captured_at: DateTime<Utc>,
|
||||
/// Oldest-first. Empty when the container has produced no output
|
||||
/// yet. Each entry is one container log line as the runtime
|
||||
/// reported it; we do not split / merge / re-encode.
|
||||
pub lines: Vec<String>,
|
||||
/// `true` when the runtime had more output than fit in the chunk
|
||||
/// (either the per-request size budget or the runtime's own tail
|
||||
/// limit). Surfacing this lets the dashboard show a "showing last
|
||||
/// N of more" hint instead of silently hiding lines.
|
||||
pub truncated: bool,
|
||||
}
|
||||
|
||||
impl LogChunk {
|
||||
/// `true` when the device has produced no log lines yet for this
|
||||
/// deployment. Distinct from "the device is offline" (that's a
|
||||
/// transport error, not an empty chunk) — the dashboard renders
|
||||
/// the two differently.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.lines.is_empty()
|
||||
}
|
||||
|
||||
/// How many lines this chunk carries.
|
||||
pub fn len(&self) -> usize {
|
||||
self.lines.len()
|
||||
}
|
||||
}
|
||||
|
||||
/// Why a log query failed.
|
||||
///
|
||||
/// Four arms, deliberately mapped to four different operator actions:
|
||||
///
|
||||
/// - [`LogQueryError::DeviceOffline`] — no agent listening. The
|
||||
/// dashboard shows "device is offline"; retrying immediately is
|
||||
/// pointless.
|
||||
/// - [`LogQueryError::Timeout`] — agent didn't reply in time. Could
|
||||
/// be a slow podman socket, a saturated network, or a hung agent.
|
||||
/// The dashboard offers a retry button.
|
||||
/// - [`LogQueryError::Agent`] — the agent replied but reported its
|
||||
/// own failure (deployment not found, container has no log driver,
|
||||
/// etc.). The dashboard renders the message verbatim — it's the
|
||||
/// agent's responsibility to make these actionable.
|
||||
/// - [`LogQueryError::BadReply`] — agent replied with bytes that
|
||||
/// couldn't be deserialized. Indicates a version mismatch between
|
||||
/// operator and agent; the dashboard shows "device protocol
|
||||
/// mismatch — update the agent".
|
||||
/// - [`LogQueryError::Transport`] — generic NATS transport failure
|
||||
/// (publish failed, subscribe failed). Rare; usually a NATS server
|
||||
/// problem, not a device problem.
|
||||
///
|
||||
/// We mirror the categorization that `FleetCommandsClient::CommandError`
|
||||
/// uses for `Ping` — both share the same NATS request/reply substrate,
|
||||
/// so callers benefit from a uniform error shape across verbs.
|
||||
#[derive(Debug, Error)]
|
||||
pub enum LogQueryError {
|
||||
#[error("device offline (no agent listening on the logs subject)")]
|
||||
DeviceOffline,
|
||||
#[error("log query timed out after {0:?}")]
|
||||
Timeout(std::time::Duration),
|
||||
#[error("agent reported an error: {0}")]
|
||||
Agent(String),
|
||||
#[error("agent reply decode failed: {0}")]
|
||||
BadReply(#[from] serde_json::Error),
|
||||
#[error("nats transport: {0}")]
|
||||
Transport(String),
|
||||
/// The wire payload was syntactically valid but semantically
|
||||
/// unusable — e.g. the agent returned a `source` string that
|
||||
/// failed [`LogSource`] validation. We surface this as a
|
||||
/// distinct arm (rather than folding into `BadReply`) so the
|
||||
/// dashboard can log the bad value: it's almost always an agent
|
||||
/// bug.
|
||||
#[error("invalid reply contents: {0}")]
|
||||
InvalidReply(String),
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn pn(s: &str) -> LogSource {
|
||||
LogSource::try_new(s).expect("valid")
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn empty_chunk_reports_is_empty() {
|
||||
// "No lines yet" is a legal, common state — the dashboard
|
||||
// renders it as "no output". A separate state from "device
|
||||
// offline".
|
||||
let chunk = LogChunk {
|
||||
source: pn("podman:hello"),
|
||||
captured_at: Utc::now(),
|
||||
lines: vec![],
|
||||
truncated: false,
|
||||
};
|
||||
assert!(chunk.is_empty());
|
||||
assert_eq!(chunk.len(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn populated_chunk_preserves_line_order() {
|
||||
// The trait contract is "oldest first". Reordering would
|
||||
// misrender the dashboard (newest line at top of an
|
||||
// oldest-first list is the *first* entry).
|
||||
let chunk = LogChunk {
|
||||
source: pn("podman:hello"),
|
||||
captured_at: Utc::now(),
|
||||
lines: vec!["a".into(), "b".into(), "c".into()],
|
||||
truncated: false,
|
||||
};
|
||||
assert_eq!(chunk.lines, vec!["a", "b", "c"]);
|
||||
assert_eq!(chunk.len(), 3);
|
||||
assert!(!chunk.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn truncated_flag_is_independent_of_emptiness() {
|
||||
// A chunk can be both truncated and have lines (we cut the
|
||||
// head) — assert both states coexist.
|
||||
let chunk = LogChunk {
|
||||
source: pn("podman:hello"),
|
||||
captured_at: Utc::now(),
|
||||
lines: vec!["latest".into()],
|
||||
truncated: true,
|
||||
};
|
||||
assert!(!chunk.is_empty());
|
||||
assert!(chunk.truncated);
|
||||
}
|
||||
}
|
||||
125
fleet/harmony-fleet-deploy/src/companion/logs/mod.rs
Normal file
125
fleet/harmony-fleet-deploy/src/companion/logs/mod.rs
Normal file
@@ -0,0 +1,125 @@
|
||||
//! Log-tail contract — the third Score companion (ADR-023 P7).
|
||||
//!
|
||||
//! ## The big picture
|
||||
//!
|
||||
//! ```text
|
||||
//! ┌────────────┐ pair ┌─────────────┐ request ┌──────────┐
|
||||
//! │ *Score │◀─type─────│ LogQuery │──────────────▶│ Agent │
|
||||
//! └────────────┘ locked └─────────────┘ (NATS r/r) └──────────┘
|
||||
//! │ │
|
||||
//! └─── deserialize reply ◀─────┘
|
||||
//! │
|
||||
//! ▼
|
||||
//! LogChunk
|
||||
//! ```
|
||||
//!
|
||||
//! - [`LogQuery`] is the companion trait: pair a [`Score`] with a way
|
||||
//! to fetch the last N lines of *that specific Score's* container
|
||||
//! logs from the device that's running it. The pairing is type-level
|
||||
//! via the `Score` associated type — the compiler refuses to call
|
||||
//! `log_query.last_lines(&podman_score, …)` with a query whose
|
||||
//! `Score = FleetOperatorScore`. Same trick the smoke contract uses
|
||||
//! (see [`super::smoke::SmokeTest`]).
|
||||
//! - [`LogChunk`] is the value the trait returns. It's a pure data
|
||||
//! type — easy to serialize for the dashboard, easy to assert
|
||||
//! against in tests, no transport types leak through.
|
||||
//! - [`PodmanLogQuery`] is the only implementor in v0.3: it sends a
|
||||
//! `device-commands.<id>.logs` request over NATS, deserializes the
|
||||
//! reply into [`LogChunk`]. Agent-side handler + dashboard wiring
|
||||
//! ship in a follow-up PR (see "Deferred" below).
|
||||
//!
|
||||
//! ## Why a companion and not a method on `Score`?
|
||||
//!
|
||||
//! Per ADR-023 P7, framework capabilities that "wrap" a Score (smoke,
|
||||
//! planning, observability, log tail) attach as companions rather
|
||||
//! than as new methods on `Score`/`Interpret`. The base trait surface
|
||||
//! stays one method (`create_interpret`). Log tail is *something
|
||||
//! paired with* a Score, not *something on* a Score — and a Score
|
||||
//! without a `LogQuery` should render "no logs view" on the
|
||||
//! dashboard, not become a compilation error.
|
||||
//!
|
||||
//! The orthogonal benefits, which mirror smoke's:
|
||||
//!
|
||||
//! - A Score can be deployed with one of several log backends (podman,
|
||||
//! journald, k8s) without rewriting the Score itself.
|
||||
//! - Test harnesses substitute a mock `LogQuery` for an end-to-end
|
||||
//! probe of the dashboard's "View logs" path, without spinning up a
|
||||
//! real device.
|
||||
//! - Adding a "live tail (stream)" verb in v0.4 is an additive trait
|
||||
//! method on `LogQuery`, not a breaking change to `Score`.
|
||||
//!
|
||||
//! ## Why sync `last_lines` and not a stream?
|
||||
//!
|
||||
//! v0.3 ships **single-shot** log tail only. Customer's mental model
|
||||
//! is "I clicked View logs, show me the last 100 lines". That's a
|
||||
//! `Future<LogChunk>`, not a `Stream<Item = String>`. The
|
||||
//! request/reply NATS pattern (one inbox, one reply, then close) maps
|
||||
//! 1:1 to that mental model and reuses the same dispatcher the
|
||||
//! `Ping` verb already runs through.
|
||||
//!
|
||||
//! Live tail is genuinely useful — but it's a separate UX (the
|
||||
//! dashboard needs scrollback management, the agent needs to attach
|
||||
//! to a running container's log stream, the wire format needs frame
|
||||
//! framing + final-marker handling) — and shipping it half-baked
|
||||
//! would degrade the "View logs" experience the customer is asking
|
||||
//! for *today*. v0.4 adds a `tail_stream` method to this trait,
|
||||
//! additive.
|
||||
//!
|
||||
//! ## Wire shape (locked in this PR)
|
||||
//!
|
||||
//! Subject: `device-commands.<device_id>.logs` — exactly the existing
|
||||
//! callout-permitted prefix (see
|
||||
//! `harmony_reconciler_contracts::SUBJECT_PREFIX`). The deployment
|
||||
//! name and the line count live in the JSON request body, not in the
|
||||
//! subject, because:
|
||||
//!
|
||||
//! 1. The agent already subscribes to `device-commands.<id>.>`; the
|
||||
//! permission template covers this subject without any callout
|
||||
//! JWT change (zero ops surface).
|
||||
//! 2. Putting the deployment in the subject would alias the verb
|
||||
//! token, breaking
|
||||
//! [`harmony_reconciler_contracts::Verb::as_subject_token`]'s
|
||||
//! "verb is the trailing token" invariant.
|
||||
//!
|
||||
//! The request body is [`harmony_reconciler_contracts::LogsRequest`],
|
||||
//! the reply body is [`harmony_reconciler_contracts::LogsReply`].
|
||||
//! Those types live in `reconciler-contracts` (not here) so the
|
||||
//! agent build, which must not depend on harmony, can serialize them.
|
||||
//! This companion is a thin wrapper around that contract — it owns
|
||||
//! transport, the contract crate owns the bytes.
|
||||
//!
|
||||
//! ## Deferred to follow-up PRs (in scope of v0.3)
|
||||
//!
|
||||
//! - **Agent-side command handler** for `Verb::Logs` in
|
||||
//! `fleet/harmony-fleet-agent/src/command_server.rs`: parse the
|
||||
//! `LogsRequest`, look up the deployment's container, run
|
||||
//! `podman logs --tail N`, serialize a `LogsReply`. The
|
||||
//! deployment→container resolution must reject names that don't
|
||||
//! match `[a-zA-Z0-9_.-]{1,128}` before invoking the runtime —
|
||||
//! defense-in-depth on top of `Command::arg` (which already
|
||||
//! avoids shell parsing). That regex is *stricter* than
|
||||
//! [`harmony_reconciler_contracts::DeploymentName`] on the wire,
|
||||
//! reflecting that the agent's resolved container name has tighter
|
||||
//! constraints than a deployment identifier.
|
||||
//! - **Dashboard handler** at
|
||||
//! `/deployments/<name>/devices/<id>/logs?lines=N` in
|
||||
//! `fleet/harmony-fleet-operator/src/frontend/server.rs`:
|
||||
//! instantiate a `PodmanLogQuery`, call `last_lines`, render the
|
||||
//! `LogChunk` as maud.
|
||||
//! - **E2E integration test** that drives a real podman container
|
||||
//! end-to-end through this trait.
|
||||
//!
|
||||
//! Splitting this is a scope-discipline call: the trait + value +
|
||||
//! NATS-transport impl + unit tests are merge-worthy on their own
|
||||
//! because they lock the contract. Once merged, the agent handler PR
|
||||
//! is a focused diff against `command_server.rs` and the dashboard
|
||||
//! handler PR is a focused diff against `server.rs` — both small,
|
||||
//! both reviewable, both backed by this PR's existing tests.
|
||||
|
||||
pub mod chunk;
|
||||
pub mod podman;
|
||||
pub mod query;
|
||||
|
||||
pub use chunk::{LogChunk, LogQueryError};
|
||||
pub use podman::PodmanLogQuery;
|
||||
pub use query::LogQuery;
|
||||
378
fleet/harmony-fleet-deploy/src/companion/logs/podman.rs
Normal file
378
fleet/harmony-fleet-deploy/src/companion/logs/podman.rs
Normal file
@@ -0,0 +1,378 @@
|
||||
//! `PodmanLogQuery` — the NATS request/reply implementation of
|
||||
//! [`LogQuery`] for a [`PodmanV0Score`].
|
||||
//!
|
||||
//! Transport: NATS request on
|
||||
//! `device-commands.<device_id>.logs`. Body: a JSON
|
||||
//! [`LogsRequest`] carrying the deployment name and a clamped line
|
||||
//! count. Reply: a JSON [`LogsReply`] which we re-wrap into a
|
||||
//! [`LogChunk`].
|
||||
//!
|
||||
//! The query is constructed with the device id and deployment name
|
||||
//! it targets — those are the routing parameters that the dashboard
|
||||
//! handler already has on the URL path
|
||||
//! (`/deployments/<name>/devices/<id>/logs`). The `LogQuery` trait
|
||||
//! method takes `&Self::Score` for the compile-time pairing
|
||||
//! guarantee, but the wire request doesn't need the score body
|
||||
//! itself (the agent reads its own deployment KV).
|
||||
//!
|
||||
//! Why not let the trait method take `device_id`/`deployment` as
|
||||
//! parameters? Because the trait must stay backend-agnostic. A
|
||||
//! future `K8sLogQuery` doesn't have a device id; it has a pod name.
|
||||
//! Routing parameters belong on the implementor, not on the trait.
|
||||
//!
|
||||
//! ## Split between routing and transport
|
||||
//!
|
||||
//! [`LogQueryRouting`] is the pure routing value: device id +
|
||||
//! deployment + line cap, no transport. It owns `subject()` and
|
||||
//! `request_body()` so unit tests can verify the wire bytes without
|
||||
//! constructing a NATS client. [`PodmanLogQuery`] adds the
|
||||
//! `async_nats::Client` on top and implements [`LogQuery`].
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use async_nats::Client;
|
||||
use async_nats::error::Error as NatsError;
|
||||
use async_trait::async_trait;
|
||||
use harmony::modules::podman::{PodmanTopology, PodmanV0Score};
|
||||
use harmony_reconciler_contracts::{
|
||||
CommandRequest, DeploymentName, ErrorReply, Id, LOGS_MAX_LINES, LogsReply, LogsRequest, Verb,
|
||||
device_command_subject,
|
||||
};
|
||||
|
||||
use super::chunk::{LogChunk, LogQueryError, LogSource};
|
||||
use super::query::LogQuery;
|
||||
|
||||
/// Default reply timeout for a single log query. 10 s gives a slow
|
||||
/// podman socket (cold-cached image, busy device) room to breathe;
|
||||
/// offline devices get short-circuited earlier by NATS's
|
||||
/// `no_responders` reply, so the timeout only matters when the agent
|
||||
/// is alive but slow.
|
||||
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Routing parameters for one log query. Pure data — no NATS client,
|
||||
/// no IO. Lets unit tests verify the wire bytes without a connected
|
||||
/// client.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct LogQueryRouting {
|
||||
pub device_id: Id,
|
||||
pub deployment: DeploymentName,
|
||||
}
|
||||
|
||||
impl LogQueryRouting {
|
||||
pub fn new(device_id: Id, deployment: DeploymentName) -> Self {
|
||||
Self {
|
||||
device_id,
|
||||
deployment,
|
||||
}
|
||||
}
|
||||
|
||||
/// Build the NATS subject this query publishes to. Kept on the
|
||||
/// routing value (not on the transport struct) so the agent-side
|
||||
/// dispatcher tests in the follow-up PR can use the same helper
|
||||
/// without pulling in a NATS client.
|
||||
pub fn subject(&self) -> String {
|
||||
device_command_subject(self.device_id.to_string().as_str(), Verb::Logs)
|
||||
}
|
||||
|
||||
/// Serialize the wire body. Clamps `n` to [`LOGS_MAX_LINES`] so
|
||||
/// a buggy "show all" dashboard button can't ask for an
|
||||
/// unbounded transfer.
|
||||
pub fn request_body(&self, n: usize) -> Vec<u8> {
|
||||
let req = CommandRequest::Logs(LogsRequest {
|
||||
deployment: self.deployment.clone(),
|
||||
lines: clamp_lines(n),
|
||||
});
|
||||
serde_json::to_vec(&req).expect("CommandRequest is always serializable")
|
||||
}
|
||||
}
|
||||
|
||||
/// Pin the conversion `usize -> u32` to one place so the clamping
|
||||
/// behavior is testable in isolation. We saturate at
|
||||
/// [`LOGS_MAX_LINES`] both because (a) the agent will clamp anyway,
|
||||
/// and (b) sending a request advertising a smaller `lines` value
|
||||
/// than the agent's cap makes the protocol log easier to read.
|
||||
fn clamp_lines(n: usize) -> u32 {
|
||||
let n_u32 = u32::try_from(n).unwrap_or(LOGS_MAX_LINES);
|
||||
n_u32.min(LOGS_MAX_LINES)
|
||||
}
|
||||
|
||||
/// Operator-side log fetcher for a podman-runtime device.
|
||||
///
|
||||
/// Holds the [`LogQueryRouting`] for the device+deployment it
|
||||
/// targets plus the connected NATS client; the dashboard builds one
|
||||
/// per request from the URL parts, then calls
|
||||
/// `last_lines(&score, &topology, n)`.
|
||||
pub struct PodmanLogQuery {
|
||||
nc: Client,
|
||||
routing: LogQueryRouting,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl PodmanLogQuery {
|
||||
pub fn new(nc: Client, device_id: Id, deployment: DeploymentName) -> Self {
|
||||
Self {
|
||||
nc,
|
||||
routing: LogQueryRouting::new(device_id, deployment),
|
||||
timeout: DEFAULT_TIMEOUT,
|
||||
}
|
||||
}
|
||||
|
||||
/// Override the default reply timeout. Tests use a very small
|
||||
/// value to keep the suite fast; production sticks with
|
||||
/// [`DEFAULT_TIMEOUT`].
|
||||
pub fn with_timeout(mut self, timeout: Duration) -> Self {
|
||||
self.timeout = timeout;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn routing(&self) -> &LogQueryRouting {
|
||||
&self.routing
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LogQuery<PodmanTopology> for PodmanLogQuery {
|
||||
type Score = PodmanV0Score;
|
||||
|
||||
async fn last_lines(
|
||||
&self,
|
||||
_score: &Self::Score,
|
||||
_topology: &PodmanTopology,
|
||||
n: usize,
|
||||
) -> Result<LogChunk, LogQueryError> {
|
||||
let subject = self.routing.subject();
|
||||
let payload = self.routing.request_body(n);
|
||||
|
||||
let fut = self.nc.request(subject, payload.into());
|
||||
let resp = match tokio::time::timeout(self.timeout, fut).await {
|
||||
Ok(Ok(msg)) => msg,
|
||||
Ok(Err(err)) => return Err(map_request_error(err, self.timeout)),
|
||||
Err(_) => return Err(LogQueryError::Timeout(self.timeout)),
|
||||
};
|
||||
|
||||
decode_reply(&resp.payload)
|
||||
}
|
||||
}
|
||||
|
||||
/// Map an `async-nats` `RequestError` to our typed surface. Same
|
||||
/// shape `FleetCommandsClient` uses for `Ping` — we propagate
|
||||
/// `NoResponders` and `TimedOut` so the dashboard can render them
|
||||
/// differently.
|
||||
fn map_request_error(
|
||||
err: NatsError<async_nats::client::RequestErrorKind>,
|
||||
timeout: Duration,
|
||||
) -> LogQueryError {
|
||||
use async_nats::client::RequestErrorKind;
|
||||
match err.kind() {
|
||||
RequestErrorKind::NoResponders => LogQueryError::DeviceOffline,
|
||||
RequestErrorKind::TimedOut => LogQueryError::Timeout(timeout),
|
||||
_ => LogQueryError::Transport(err.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Decode a reply payload. The agent can reply with either a
|
||||
/// [`LogsReply`] (happy path) or an [`ErrorReply`] (the agent
|
||||
/// successfully received the request but couldn't fulfil it). We try
|
||||
/// `LogsReply` first because it's the common case; only if that
|
||||
/// fails do we attempt `ErrorReply`. Falling all the way through
|
||||
/// yields [`LogQueryError::InvalidReply`].
|
||||
///
|
||||
/// `pub(crate)` so unit tests can drive it without a live NATS
|
||||
/// server. Not part of the public API — callers always go through
|
||||
/// [`PodmanLogQuery::last_lines`].
|
||||
pub(crate) fn decode_reply(bytes: &[u8]) -> Result<LogChunk, LogQueryError> {
|
||||
// Happy path first — keeps the cold path cost on errors only.
|
||||
if let Ok(reply) = serde_json::from_slice::<LogsReply>(bytes) {
|
||||
let source = LogSource::try_new(reply.source.clone())
|
||||
.map_err(|e| LogQueryError::InvalidReply(format!("invalid source: {e}")))?;
|
||||
return Ok(LogChunk {
|
||||
source,
|
||||
captured_at: reply.captured_at,
|
||||
lines: reply.lines,
|
||||
truncated: reply.truncated,
|
||||
});
|
||||
}
|
||||
if let Ok(err) = serde_json::from_slice::<ErrorReply>(bytes) {
|
||||
return Err(LogQueryError::Agent(err.message));
|
||||
}
|
||||
// Neither shape matched — the agent is speaking a protocol we
|
||||
// don't understand. The dashboard will render this as a version
|
||||
// mismatch.
|
||||
Err(LogQueryError::InvalidReply(format!(
|
||||
"reply did not match LogsReply or ErrorReply ({} bytes)",
|
||||
bytes.len()
|
||||
)))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use chrono::{DateTime, Utc};
|
||||
use harmony_reconciler_contracts::{ErrorKind, ErrorReply};
|
||||
|
||||
fn dn(s: &str) -> DeploymentName {
|
||||
DeploymentName::try_new(s).expect("valid")
|
||||
}
|
||||
|
||||
fn routing() -> LogQueryRouting {
|
||||
LogQueryRouting::new(Id::from("pi-42".to_string()), dn("hello-web"))
|
||||
}
|
||||
|
||||
// -- clamp_lines ---------------------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn clamp_lines_caps_at_protocol_max() {
|
||||
// Operators can call `last_lines(usize::MAX, …)` — the wire
|
||||
// value must stay within the budget the agent enforces.
|
||||
assert_eq!(clamp_lines(usize::MAX), LOGS_MAX_LINES);
|
||||
assert_eq!(clamp_lines(LOGS_MAX_LINES as usize + 1), LOGS_MAX_LINES);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clamp_lines_passes_through_small_values() {
|
||||
assert_eq!(clamp_lines(0), 0);
|
||||
assert_eq!(clamp_lines(1), 1);
|
||||
assert_eq!(clamp_lines(100), 100);
|
||||
}
|
||||
|
||||
// -- LogQueryRouting::subject() -----------------------------------------
|
||||
|
||||
#[test]
|
||||
fn subject_matches_documented_format() {
|
||||
// The subject the agent's existing `device-commands.<id>.>`
|
||||
// subscription receives. If this drifts, the callout
|
||||
// permission template stops covering the request — silent
|
||||
// routing failure under JWT enforcement.
|
||||
let r = routing();
|
||||
assert_eq!(r.subject(), "device-commands.pi-42.logs");
|
||||
}
|
||||
|
||||
// -- LogQueryRouting::request_body() ------------------------------------
|
||||
|
||||
#[test]
|
||||
fn request_body_is_tagged_json_with_clamped_lines() {
|
||||
// Locks the wire format. Older agents that learn `Verb::Logs`
|
||||
// expect this exact shape on receipt.
|
||||
let r = routing();
|
||||
let body = r.request_body(50);
|
||||
let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
||||
assert_eq!(parsed["verb"], "logs");
|
||||
assert_eq!(parsed["deployment"], "hello-web");
|
||||
assert_eq!(parsed["lines"], 50);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn request_body_clamps_oversized_n() {
|
||||
// A dashboard with a buggy "show all" button must not get
|
||||
// through unchecked. We clamp on the operator side too —
|
||||
// defense in depth, and a more honest log message ("asked
|
||||
// for 1000") than "asked for usize::MAX, agent capped".
|
||||
let r = routing();
|
||||
let body = r.request_body(usize::MAX);
|
||||
let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
||||
assert_eq!(parsed["lines"], LOGS_MAX_LINES);
|
||||
}
|
||||
|
||||
// -- decode_reply() ------------------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn decode_reply_unwraps_logs_reply_into_log_chunk() {
|
||||
let reply = LogsReply {
|
||||
source: "podman:hello-web/api".to_string(),
|
||||
captured_at: DateTime::parse_from_rfc3339("2026-01-15T12:34:56Z")
|
||||
.unwrap()
|
||||
.with_timezone(&Utc),
|
||||
lines: vec!["line a".into(), "line b".into()],
|
||||
truncated: false,
|
||||
};
|
||||
let bytes = serde_json::to_vec(&reply).unwrap();
|
||||
let chunk = decode_reply(&bytes).expect("valid LogsReply must decode");
|
||||
assert_eq!(chunk.source.as_str(), "podman:hello-web/api");
|
||||
assert_eq!(chunk.lines, vec!["line a", "line b"]);
|
||||
assert!(!chunk.truncated);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_reply_carries_truncated_flag() {
|
||||
// The dashboard renders "showing last N of more" off this
|
||||
// flag. If we accidentally drop it on the wire, that hint
|
||||
// disappears and customers think they have full output.
|
||||
let reply = LogsReply {
|
||||
source: "podman:hello-web/api".to_string(),
|
||||
captured_at: Utc::now(),
|
||||
lines: vec!["only".into()],
|
||||
truncated: true,
|
||||
};
|
||||
let bytes = serde_json::to_vec(&reply).unwrap();
|
||||
let chunk = decode_reply(&bytes).unwrap();
|
||||
assert!(chunk.truncated);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_reply_surfaces_agent_error_reply_as_agent_variant() {
|
||||
// The agent can fail without crashing — e.g. the deployment
|
||||
// exists in KV but the container hasn't started yet. That
|
||||
// must surface as `Agent(message)` so the dashboard renders
|
||||
// the agent's own description, not "device offline".
|
||||
let err = ErrorReply {
|
||||
kind: ErrorKind::BadRequest,
|
||||
message: "deployment hello-web has no containers running".into(),
|
||||
};
|
||||
let bytes = serde_json::to_vec(&err).unwrap();
|
||||
let result = decode_reply(&bytes);
|
||||
match result {
|
||||
Err(LogQueryError::Agent(msg)) => {
|
||||
assert!(msg.contains("hello-web"));
|
||||
}
|
||||
other => panic!("expected Agent variant, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_reply_rejects_invalid_source_name() {
|
||||
// A malicious or buggy agent could return a `source` with
|
||||
// control characters that would corrupt the dashboard.
|
||||
// LogSource validation catches this — we surface it as
|
||||
// InvalidReply, not a panic, and not a successful chunk.
|
||||
let bad = serde_json::json!({
|
||||
"source": "bad\nname",
|
||||
"captured_at": "2026-01-15T12:00:00Z",
|
||||
"lines": [],
|
||||
"truncated": false,
|
||||
});
|
||||
let bytes = serde_json::to_vec(&bad).unwrap();
|
||||
match decode_reply(&bytes) {
|
||||
Err(LogQueryError::InvalidReply(_)) => {}
|
||||
other => panic!("expected InvalidReply, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decode_reply_falls_through_to_invalid_for_unknown_shape() {
|
||||
// Neither LogsReply nor ErrorReply — a future verb's reply
|
||||
// accidentally routed here, or an upgraded agent talking a
|
||||
// newer protocol. The dashboard shows "protocol mismatch".
|
||||
let bytes = br#"{"completely":"unrelated"}"#;
|
||||
match decode_reply(bytes) {
|
||||
Err(LogQueryError::InvalidReply(_)) => {}
|
||||
other => panic!("expected InvalidReply, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
// -- LogQuery pairing (compile-time) ------------------------------------
|
||||
|
||||
/// Compile-time test: `PodmanLogQuery::Score == PodmanV0Score`.
|
||||
/// If a future refactor changes the associated type without
|
||||
/// updating callers, this test fails to compile — which is
|
||||
/// exactly the compile-time-safety guarantee the companion
|
||||
/// pattern exists to provide.
|
||||
#[test]
|
||||
fn paired_score_type_is_podman_v0_score() {
|
||||
fn assert_pair<Q>()
|
||||
where
|
||||
Q: LogQuery<PodmanTopology, Score = PodmanV0Score>,
|
||||
{
|
||||
}
|
||||
assert_pair::<PodmanLogQuery>();
|
||||
}
|
||||
}
|
||||
66
fleet/harmony-fleet-deploy/src/companion/logs/query.rs
Normal file
66
fleet/harmony-fleet-deploy/src/companion/logs/query.rs
Normal file
@@ -0,0 +1,66 @@
|
||||
//! The `LogQuery` trait — companion contract for "fetch the last N
|
||||
//! lines of this Score's container logs".
|
||||
//!
|
||||
//! The trait pairs a [`Score`] with a way to query its logs, with the
|
||||
//! pairing locked at the type level via the `Score` associated type.
|
||||
//! Same mechanism the smoke contract uses; same compile-time
|
||||
//! guarantee that callers can't pass a `PodmanLogQuery` to a
|
||||
//! `FleetOperatorScore`.
|
||||
//!
|
||||
//! Object safety: kept *not* object-safe on purpose. Callers in v0.3
|
||||
//! always know the concrete Score type at the call site (the
|
||||
//! dashboard handler is `Score = PodmanV0Score` at compile time —
|
||||
//! topologies are compile-time per ADR-023 P6). When a second Score
|
||||
//! type starts shipping logs (e.g. k8s `Deployment`), we add another
|
||||
//! impl, not a `Box<dyn LogQuery>`.
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use harmony::score::Score;
|
||||
use harmony::topology::Topology;
|
||||
|
||||
use super::chunk::{LogChunk, LogQueryError};
|
||||
|
||||
/// Pair a [`Score`] with a way to fetch the last N lines of its
|
||||
/// container logs.
|
||||
///
|
||||
/// Implementors live next to the transport they use:
|
||||
/// [`super::PodmanLogQuery`] for NATS request/reply against the fleet
|
||||
/// agent's podman runtime. Future impls (`K8sLogQuery`,
|
||||
/// `JournaldLogQuery`) follow the same shape.
|
||||
///
|
||||
/// Why a trait and not a free function? Because mocking is the point
|
||||
/// of this companion. The dashboard handler takes
|
||||
/// `&dyn LogQuery<…>` (statically dispatched in v0.3, but the trait
|
||||
/// shape is mock-friendly): the e2e test substitutes a
|
||||
/// `FixtureLogQuery` that returns a canned `LogChunk` without needing
|
||||
/// a real device, a real NATS server, or a real podman socket. That
|
||||
/// makes the dashboard's "View logs" path testable in seconds, not
|
||||
/// minutes.
|
||||
#[async_trait]
|
||||
pub trait LogQuery<T: Topology>: Send + Sync {
|
||||
/// The Score this query is paired with. Type-locked so the
|
||||
/// compiler refuses `podman.last_lines(&operator_score, …)` —
|
||||
/// the wrong Score type doesn't even compile.
|
||||
type Score: Score<T>;
|
||||
|
||||
/// Fetch the most recent `n` lines of `score`'s container logs.
|
||||
///
|
||||
/// Semantics:
|
||||
/// - Returns oldest-first lines, up to `n`.
|
||||
/// - If the runtime has fewer than `n` lines, returns all of them
|
||||
/// with `LogChunk::truncated = false`.
|
||||
/// - If the per-request size budget cuts the response,
|
||||
/// `LogChunk::truncated = true`.
|
||||
/// - `n == 0` is legal and returns an empty chunk. Callers should
|
||||
/// prefer a sensible default (e.g. 100) at the UI layer rather
|
||||
/// than relying on this edge case.
|
||||
///
|
||||
/// Failure modes are categorized via [`LogQueryError`].
|
||||
async fn last_lines(
|
||||
&self,
|
||||
score: &Self::Score,
|
||||
topology: &T,
|
||||
n: usize,
|
||||
) -> Result<LogChunk, LogQueryError>;
|
||||
}
|
||||
@@ -13,13 +13,19 @@
|
||||
//! - [`AgentObservation`] — derived view of a fleet agent's KV watch
|
||||
//! filter, computed from a typed `AgentConfig`. Used by the e2e
|
||||
//! harness to write desired-state keys the agent will pick up.
|
||||
//! - [`smoke`] — the smoke-test contract. `Probe` trait, `SmokeSuite`
|
||||
//! composition, `SmokeTest` companion, and the `deploy`/
|
||||
//! `deploy_with_smoke` wrappers that bind a Score's interpret to
|
||||
//! an optional smoke run. Implements ADR-023 P4 ("deploy returns
|
||||
//! only after smoke-test success") via the companion seam from P7.
|
||||
//! - [`smoke`] — the smoke-test contract. One `SmokeTest` trait with
|
||||
//! a single `verify` method returning a `SmokeReport`, plus
|
||||
//! `deploy` / `deploy_with_smoke` free functions. Implements
|
||||
//! ADR-023 P4 ("deploy returns only after smoke-test success") via
|
||||
//! the companion seam from P7.
|
||||
//! - [`logs`] — the log-tail contract. `LogQuery` companion trait,
|
||||
//! `LogChunk` value, and the `PodmanLogQuery` NATS request/reply
|
||||
//! implementation. Powers the dashboard's "View logs" path —
|
||||
//! customer clicks the button, gets the last N lines from the
|
||||
//! device. Live tail (streaming) is v0.4.
|
||||
|
||||
pub mod agent;
|
||||
pub mod logs;
|
||||
pub mod smoke;
|
||||
|
||||
pub use agent::AgentObservation;
|
||||
|
||||
633
fleet/harmony-fleet-deploy/src/companion/smoke.rs
Normal file
633
fleet/harmony-fleet-deploy/src/companion/smoke.rs
Normal file
@@ -0,0 +1,633 @@
|
||||
//! Smoke-test companion — minimal shape.
|
||||
//!
|
||||
//! Implements ADR-023 P4 ("deploy returns only after smoke-test
|
||||
//! success") and P7 ("extend Scores with companions, not API
|
||||
//! changes"). The earlier draft of this module decomposed smoke into
|
||||
//! `Probe` / `ProbeAttempt` / `ProbeOutcome` / `ProbeFailure` /
|
||||
//! `ProbeName` / `RetryPolicy` / `SmokeSuite` / `SmokeStage` /
|
||||
//! `SmokeReport` / `SmokeTest` / `SmokeAssemblyError`. That was
|
||||
//! cardinality matching gone overboard for what is, in the end, "run
|
||||
//! an async function after deploy and surface a pipeline report."
|
||||
//! This rewrite picks the smallest seam that still buys the things we
|
||||
//! actually need.
|
||||
//!
|
||||
//! ## What we kept
|
||||
//!
|
||||
//! - [`SmokeTest`] — one trait, one async method. Implementer writes a
|
||||
//! normal Rust function and returns a [`SmokeReport`].
|
||||
//! - Type-level pairing via the associated `type Interpret`
|
||||
//! (suggested in PR #292 review): one smoke can cover every Score
|
||||
//! that shares an Interpret (e.g. `HelmChartScore` + any preset on
|
||||
//! top of it). **Pairing is convention-only in v0.3** —
|
||||
//! `Score::create_interpret` returns a `Box<dyn Interpret<T>>`, so
|
||||
//! the `deploy_with_smoke` call site can't statically refuse a
|
||||
//! mismatched smoke. Closing that gap is an additive refactor on
|
||||
//! the `Score` trait (`type Interpret: Interpret<T>;`) and is
|
||||
//! deliberately deferred — the API shape supports it.
|
||||
//! - [`deploy`] / [`deploy_with_smoke`] — free functions. `deploy`
|
||||
//! runs the Score; `deploy_with_smoke` runs the Score then blocks
|
||||
//! on smoke.
|
||||
//! - Pipeline data the dashboard renders top-to-bottom
|
||||
//! ([`CheckReport`] entries on [`SmokeReport`]).
|
||||
//!
|
||||
//! ## What we dropped, and what to do instead
|
||||
//!
|
||||
//! | Old type | Replaced by |
|
||||
//! |---------------------------------------|-----------------------------------------------------------------------------|
|
||||
//! | `Probe`, `ProbeAttempt`, `ProbeOutcome`, `ProbeFailure` | Write a normal `async fn` inside `verify`; push `CheckReport`s into a `Vec`. |
|
||||
//! | `ProbeName` (validated newtype) | `&'static str` — the names come from our own static code, not user input. |
|
||||
//! | `RetryPolicy` value type | [`poll_until`] free function. Pass the budget + interval directly. |
|
||||
//! | `SmokeSuite` builder | A `Vec<CheckReport>` collected inside `verify`. No intermediate type. |
|
||||
//! | `TcpReachable` probe impl | [`tcp_reachable`] free function. Call it; it returns a `CheckReport`. |
|
||||
//!
|
||||
//! Reusability is preserved: the helpers ([`poll_until`],
|
||||
//! [`tcp_reachable`]) compose by simple function call. A future
|
||||
//! `http_healthy(url)` or `k8s_pod_ready(client, name)` belongs in
|
||||
//! this module as another `async fn -> CheckReport`, not as another
|
||||
//! trait.
|
||||
|
||||
use std::future::Future;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use harmony::interpret::{Interpret, InterpretError, Outcome};
|
||||
use harmony::inventory::Inventory;
|
||||
use harmony::score::Score;
|
||||
use harmony::topology::Topology;
|
||||
use thiserror::Error;
|
||||
use tracing::{info, info_span};
|
||||
|
||||
/// Pair an [`Interpret`] type with an async smoke verification.
|
||||
///
|
||||
/// One method. Implementers compose their checks inline; no probe
|
||||
/// trait, no suite builder, no retry-policy value type. If retries
|
||||
/// are needed, wrap the closure with [`poll_until`]; otherwise just
|
||||
/// `await` once.
|
||||
///
|
||||
/// Associating with `Interpret` (not the concrete `Score`) lets a
|
||||
/// single smoke impl cover a family of Scores that share an
|
||||
/// Interpret. Convention-only enforcement in v0.3 — see the
|
||||
/// module-level docs.
|
||||
#[async_trait]
|
||||
pub trait SmokeTest<T: Topology>: Send + Sync {
|
||||
/// The Interpret class this smoke verifies. Documentary in v0.3;
|
||||
/// becomes compile-time enforceable once `Score` gains its own
|
||||
/// `type Interpret`.
|
||||
type Interpret: Interpret<T>;
|
||||
|
||||
/// Run all checks. Implementers are free to short-circuit on
|
||||
/// first failure, or run every check to give the dashboard a
|
||||
/// full per-step picture. The framework reads `report.passed()`
|
||||
/// to gate the deploy and renders `report.checks` for the user.
|
||||
async fn verify(&self, topology: &T) -> SmokeReport;
|
||||
}
|
||||
|
||||
/// Aggregated smoke result. Dashboards render `checks` top-to-bottom.
|
||||
///
|
||||
/// A report is "passing" iff it has at least one check AND every
|
||||
/// check passed. An empty report passes nothing — the framework
|
||||
/// rejects empty reports the same way it rejects failing ones, so a
|
||||
/// smoke impl that forgets to push any checks fails loudly instead
|
||||
/// of letting a deploy through.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct SmokeReport {
|
||||
pub checks: Vec<CheckReport>,
|
||||
}
|
||||
|
||||
impl SmokeReport {
|
||||
pub fn passed(&self) -> bool {
|
||||
!self.checks.is_empty() && self.checks.iter().all(|c| c.passed)
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.checks.is_empty()
|
||||
}
|
||||
|
||||
/// Iterator over only the failures — handy for compact error
|
||||
/// rendering without filtering at the call site.
|
||||
pub fn failed(&self) -> impl Iterator<Item = &CheckReport> {
|
||||
self.checks.iter().filter(|c| !c.passed)
|
||||
}
|
||||
}
|
||||
|
||||
/// One step's result, named for the dashboard and the operator's
|
||||
/// log. `detail` is `Some` whenever there's something useful to show
|
||||
/// (a connect error, the elapsed time, the response body's first
|
||||
/// line) — leave it `None` for trivial passes.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CheckReport {
|
||||
pub name: &'static str,
|
||||
pub passed: bool,
|
||||
pub detail: Option<String>,
|
||||
}
|
||||
|
||||
impl CheckReport {
|
||||
pub fn pass(name: &'static str) -> Self {
|
||||
Self {
|
||||
name,
|
||||
passed: true,
|
||||
detail: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn pass_with(name: &'static str, detail: impl Into<String>) -> Self {
|
||||
Self {
|
||||
name,
|
||||
passed: true,
|
||||
detail: Some(detail.into()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn fail(name: &'static str, detail: impl Into<String>) -> Self {
|
||||
Self {
|
||||
name,
|
||||
passed: false,
|
||||
detail: Some(detail.into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Poll a fallible async closure until it returns `Ok` or `budget`
|
||||
/// elapses. Returns a single [`CheckReport`] capturing the outcome.
|
||||
///
|
||||
/// Use this inside [`SmokeTest::verify`] when you need retries.
|
||||
/// Otherwise just `await` once and build a `CheckReport` from the
|
||||
/// result — no helper required.
|
||||
pub async fn poll_until<F, Fut>(
|
||||
name: &'static str,
|
||||
budget: Duration,
|
||||
interval: Duration,
|
||||
check: F,
|
||||
) -> CheckReport
|
||||
where
|
||||
F: Fn() -> Fut,
|
||||
Fut: Future<Output = Result<(), String>>,
|
||||
{
|
||||
let deadline = Instant::now() + budget;
|
||||
loop {
|
||||
match check().await {
|
||||
Ok(()) => return CheckReport::pass(name),
|
||||
Err(e) => {
|
||||
if Instant::now() >= deadline {
|
||||
return CheckReport::fail(name, format!("did not pass within {budget:?}: {e}"));
|
||||
}
|
||||
tokio::time::sleep(interval).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// One TCP connect attempt against `addr`, gated by `timeout`.
|
||||
///
|
||||
/// Wrap with [`poll_until`] for retry semantics:
|
||||
///
|
||||
/// ```ignore
|
||||
/// poll_until("nats-reachable", Duration::from_secs(30), Duration::from_secs(1), || async {
|
||||
/// tcp_reachable("once", "nats:4222", Duration::from_secs(1))
|
||||
/// .await
|
||||
/// .passed
|
||||
/// .then_some(())
|
||||
/// .ok_or_else(|| "still not reachable".to_string())
|
||||
/// }).await
|
||||
/// ```
|
||||
pub async fn tcp_reachable(name: &'static str, addr: &str, timeout: Duration) -> CheckReport {
|
||||
match tokio::time::timeout(timeout, tokio::net::TcpStream::connect(addr)).await {
|
||||
Ok(Ok(_)) => CheckReport::pass(name),
|
||||
Ok(Err(e)) => CheckReport::fail(name, format!("connect failed: {e}")),
|
||||
Err(_) => CheckReport::fail(name, format!("timeout after {timeout:?}")),
|
||||
}
|
||||
}
|
||||
|
||||
/// What `deploy*` returns on success. `smoke` is `None` for the
|
||||
/// no-smoke variant, `Some` (with `passed() == true`) otherwise.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DeployOutcome {
|
||||
pub interpret: Outcome,
|
||||
pub smoke: Option<SmokeReport>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum DeployError {
|
||||
#[error("score interpret failed: {0}")]
|
||||
Interpret(#[from] InterpretError),
|
||||
#[error(
|
||||
"smoke failed: {} of {} check(s) did not pass",
|
||||
.report.failed().count(),
|
||||
.report.checks.len(),
|
||||
)]
|
||||
SmokeFailed {
|
||||
/// The Score deployed cleanly — preserved so the dashboard
|
||||
/// can render the success message next to the smoke failure.
|
||||
interpret: Outcome,
|
||||
report: SmokeReport,
|
||||
},
|
||||
}
|
||||
|
||||
/// Deploy a Score; do **not** run any smoke. Equivalent to calling
|
||||
/// `score.interpret(...)` directly — provided so callers can switch
|
||||
/// between smoke / no-smoke at one site.
|
||||
pub async fn deploy<S, T>(
|
||||
score: S,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<DeployOutcome, DeployError>
|
||||
where
|
||||
S: Score<T>,
|
||||
T: Topology,
|
||||
{
|
||||
let span = info_span!("deploy", score = score.name());
|
||||
let _g = span.enter();
|
||||
info!("deploy: starting interpret (no smoke)");
|
||||
let interpret = score.interpret(inventory, topology).await?;
|
||||
info!(status = %interpret.status, "deploy: interpret returned");
|
||||
Ok(DeployOutcome {
|
||||
interpret,
|
||||
smoke: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Deploy a Score and block on its smoke test (ADR-023 P4).
|
||||
///
|
||||
/// Returns `Ok` only when every check in the report passed. An empty
|
||||
/// report is rejected the same way a failing one is — a smoke impl
|
||||
/// that forgot to push checks fails the deploy rather than silently
|
||||
/// passing.
|
||||
pub async fn deploy_with_smoke<S, ST, T>(
|
||||
score: S,
|
||||
smoke: &ST,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<DeployOutcome, DeployError>
|
||||
where
|
||||
S: Score<T>,
|
||||
ST: SmokeTest<T>,
|
||||
T: Topology,
|
||||
{
|
||||
let span = info_span!("deploy", score = score.name(), smoke = true);
|
||||
let _g = span.enter();
|
||||
info!("deploy: starting interpret");
|
||||
let interpret = score.interpret(inventory, topology).await?;
|
||||
info!(status = %interpret.status, "deploy: interpret returned; running smoke");
|
||||
let report = smoke.verify(topology).await;
|
||||
info!(
|
||||
passed = report.passed(),
|
||||
checks = report.checks.len(),
|
||||
failed = report.failed().count(),
|
||||
"deploy: smoke returned",
|
||||
);
|
||||
if report.passed() {
|
||||
Ok(DeployOutcome {
|
||||
interpret,
|
||||
smoke: Some(report),
|
||||
})
|
||||
} else {
|
||||
Err(DeployError::SmokeFailed { interpret, report })
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
//! Behavioural tests for the minimal smoke shape.
|
||||
//!
|
||||
//! These exercise the deploy ↔ smoke handoff against a trivial
|
||||
//! in-process Score / Interpret / Topology so we don't depend on
|
||||
//! any external infrastructure. The fixtures are intentionally
|
||||
//! tiny — they only carry what each test actually touches.
|
||||
|
||||
use super::*;
|
||||
use async_trait::async_trait;
|
||||
use harmony::data::Version;
|
||||
use harmony::interpret::{InterpretName, InterpretStatus};
|
||||
use harmony::inventory::Inventory;
|
||||
use harmony::topology::{PreparationError, PreparationOutcome};
|
||||
use harmony_types::id::Id;
|
||||
use serde::Serialize;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NoopTopology;
|
||||
|
||||
#[async_trait]
|
||||
impl Topology for NoopTopology {
|
||||
fn name(&self) -> &str {
|
||||
"noop"
|
||||
}
|
||||
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
|
||||
Ok(PreparationOutcome::Noop)
|
||||
}
|
||||
}
|
||||
|
||||
/// A `Score` that records execute() calls and returns a configured
|
||||
/// outcome. Tracks invocations so a test can assert "interpret ran
|
||||
/// exactly once".
|
||||
#[derive(Debug, Clone)]
|
||||
struct CountingScore {
|
||||
executed: Arc<AtomicBool>,
|
||||
fail: bool,
|
||||
}
|
||||
|
||||
impl CountingScore {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
executed: Arc::new(AtomicBool::new(false)),
|
||||
fail: false,
|
||||
}
|
||||
}
|
||||
fn failing() -> Self {
|
||||
Self {
|
||||
executed: Arc::new(AtomicBool::new(false)),
|
||||
fail: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for CountingScore {
|
||||
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
|
||||
s.serialize_str("CountingScore")
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Score<NoopTopology> for CountingScore {
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<NoopTopology>> {
|
||||
Box::new(CountingInterpret {
|
||||
executed: self.executed.clone(),
|
||||
fail: self.fail,
|
||||
})
|
||||
}
|
||||
fn name(&self) -> String {
|
||||
"CountingScore".into()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct CountingInterpret {
|
||||
executed: Arc<AtomicBool>,
|
||||
fail: bool,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Interpret<NoopTopology> for CountingInterpret {
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("counting")
|
||||
}
|
||||
fn get_version(&self) -> Version {
|
||||
Version::from("0.1.0").unwrap()
|
||||
}
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
InterpretStatus::QUEUED
|
||||
}
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
vec![]
|
||||
}
|
||||
async fn execute(
|
||||
&self,
|
||||
_i: &Inventory,
|
||||
_t: &NoopTopology,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
self.executed.store(true, Ordering::SeqCst);
|
||||
if self.fail {
|
||||
Err(InterpretError::new(
|
||||
"counting: configured to fail".to_string(),
|
||||
))
|
||||
} else {
|
||||
Ok(Outcome::success("counting: ok".to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A test smoke that returns a pre-recorded report and records
|
||||
/// whether `verify` was called. Lets each test inject the exact
|
||||
/// report shape it wants to drive `deploy_with_smoke` through.
|
||||
struct FixedSmoke {
|
||||
report: Mutex<Option<SmokeReport>>,
|
||||
verify_called: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl FixedSmoke {
|
||||
fn new(report: SmokeReport) -> Self {
|
||||
Self {
|
||||
report: Mutex::new(Some(report)),
|
||||
verify_called: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SmokeTest<NoopTopology> for FixedSmoke {
|
||||
type Interpret = CountingInterpret;
|
||||
async fn verify(&self, _t: &NoopTopology) -> SmokeReport {
|
||||
self.verify_called.store(true, Ordering::SeqCst);
|
||||
self.report
|
||||
.lock()
|
||||
.unwrap()
|
||||
.take()
|
||||
.expect("verify called more than once")
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deploy_runs_interpret_and_returns_outcome() {
|
||||
let score = CountingScore::new();
|
||||
let executed = score.executed.clone();
|
||||
let outcome = deploy(score, &Inventory::empty(), &NoopTopology)
|
||||
.await
|
||||
.expect("clean interpret should produce Ok");
|
||||
assert!(executed.load(Ordering::SeqCst), "interpret must run");
|
||||
assert!(outcome.smoke.is_none(), "no-smoke variant has no report");
|
||||
assert_eq!(outcome.interpret.message, "counting: ok");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deploy_propagates_interpret_failure() {
|
||||
let result = deploy(CountingScore::failing(), &Inventory::empty(), &NoopTopology).await;
|
||||
assert!(matches!(result, Err(DeployError::Interpret(_))));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deploy_with_smoke_returns_report_on_success() {
|
||||
let score = CountingScore::new();
|
||||
let smoke = FixedSmoke::new(SmokeReport {
|
||||
checks: vec![CheckReport::pass("a"), CheckReport::pass("b")],
|
||||
});
|
||||
let outcome = deploy_with_smoke(score, &smoke, &Inventory::empty(), &NoopTopology)
|
||||
.await
|
||||
.expect("passing smoke should produce Ok");
|
||||
let report = outcome.smoke.expect("smoke report present on success");
|
||||
assert!(report.passed());
|
||||
assert_eq!(report.checks.len(), 2);
|
||||
assert!(smoke.verify_called.load(Ordering::SeqCst));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deploy_with_smoke_fails_when_any_check_fails() {
|
||||
// Pin the deploy-blocks-on-smoke contract: a single failed
|
||||
// check fails the whole deploy, regardless of how many
|
||||
// passed. The `interpret` field of the error must still be
|
||||
// populated so the dashboard can show what succeeded
|
||||
// alongside what didn't.
|
||||
let smoke = FixedSmoke::new(SmokeReport {
|
||||
checks: vec![
|
||||
CheckReport::pass("a"),
|
||||
CheckReport::fail("b", "bad"),
|
||||
CheckReport::pass("c"),
|
||||
],
|
||||
});
|
||||
let err = deploy_with_smoke(
|
||||
CountingScore::new(),
|
||||
&smoke,
|
||||
&Inventory::empty(),
|
||||
&NoopTopology,
|
||||
)
|
||||
.await
|
||||
.expect_err("failing smoke must error");
|
||||
match err {
|
||||
DeployError::SmokeFailed { report, interpret } => {
|
||||
assert!(!report.passed());
|
||||
assert_eq!(report.failed().count(), 1);
|
||||
assert_eq!(interpret.message, "counting: ok");
|
||||
}
|
||||
other => panic!("expected SmokeFailed, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deploy_with_smoke_rejects_empty_report() {
|
||||
// A smoke impl that forgot to push any checks must fail
|
||||
// loudly. Silent pass-through would defeat the entire point
|
||||
// of the contract.
|
||||
let smoke = FixedSmoke::new(SmokeReport::default());
|
||||
let err = deploy_with_smoke(
|
||||
CountingScore::new(),
|
||||
&smoke,
|
||||
&Inventory::empty(),
|
||||
&NoopTopology,
|
||||
)
|
||||
.await
|
||||
.expect_err("empty smoke must error");
|
||||
assert!(matches!(err, DeployError::SmokeFailed { .. }));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deploy_with_smoke_skips_smoke_when_interpret_fails() {
|
||||
// Interpret error short-circuits — running smoke against a
|
||||
// half-deployed component would produce confusing cascade
|
||||
// failures.
|
||||
let smoke = FixedSmoke::new(SmokeReport {
|
||||
checks: vec![CheckReport::pass("never-runs")],
|
||||
});
|
||||
let verify_called = smoke.verify_called.clone();
|
||||
let err = deploy_with_smoke(
|
||||
CountingScore::failing(),
|
||||
&smoke,
|
||||
&Inventory::empty(),
|
||||
&NoopTopology,
|
||||
)
|
||||
.await
|
||||
.expect_err("interpret failure must error");
|
||||
assert!(matches!(err, DeployError::Interpret(_)));
|
||||
assert!(
|
||||
!verify_called.load(Ordering::SeqCst),
|
||||
"smoke must NOT run when interpret fails",
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn smoke_report_passed_requires_nonempty_and_all_pass() {
|
||||
assert!(!SmokeReport::default().passed());
|
||||
assert!(
|
||||
!SmokeReport {
|
||||
checks: vec![CheckReport::pass("a"), CheckReport::fail("b", "x")],
|
||||
}
|
||||
.passed()
|
||||
);
|
||||
assert!(
|
||||
SmokeReport {
|
||||
checks: vec![CheckReport::pass("a"), CheckReport::pass("b")],
|
||||
}
|
||||
.passed()
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn poll_until_returns_pass_when_check_succeeds_within_budget() {
|
||||
let calls = Arc::new(AtomicBool::new(false));
|
||||
let c = calls.clone();
|
||||
let check = poll_until(
|
||||
"eventually-ok",
|
||||
Duration::from_secs(2),
|
||||
Duration::from_millis(10),
|
||||
move || {
|
||||
let c = c.clone();
|
||||
async move {
|
||||
if c.swap(true, Ordering::SeqCst) {
|
||||
Ok(())
|
||||
} else {
|
||||
Err("not yet".into())
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
.await;
|
||||
assert!(check.passed);
|
||||
assert_eq!(check.name, "eventually-ok");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn poll_until_returns_fail_after_budget_elapses() {
|
||||
let check = poll_until(
|
||||
"never-ok",
|
||||
Duration::from_millis(50),
|
||||
Duration::from_millis(10),
|
||||
|| async { Err("still nope".into()) },
|
||||
)
|
||||
.await;
|
||||
assert!(!check.passed);
|
||||
assert!(
|
||||
check
|
||||
.detail
|
||||
.as_deref()
|
||||
.is_some_and(|d| d.contains("still nope")),
|
||||
"detail must carry the last error, got {:?}",
|
||||
check.detail,
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tcp_reachable_passes_against_real_listener() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr: SocketAddr = listener.local_addr().unwrap();
|
||||
// Accept loop so connect() completes; otherwise SYN-RST.
|
||||
tokio::spawn(async move {
|
||||
let _ = listener.accept().await;
|
||||
});
|
||||
|
||||
let check = tcp_reachable("loopback", &addr.to_string(), Duration::from_secs(1)).await;
|
||||
assert!(check.passed, "loopback connect must succeed: {check:?}");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn tcp_reachable_fails_with_timeout_when_no_listener() {
|
||||
// RFC 5737 TEST-NET-1: guaranteed-non-routable; a SYN here
|
||||
// will sit unanswered and we'll hit our own timeout deterministically.
|
||||
let check = tcp_reachable("dead-net", "192.0.2.1:1", Duration::from_millis(150)).await;
|
||||
assert!(!check.passed);
|
||||
assert!(
|
||||
check
|
||||
.detail
|
||||
.as_deref()
|
||||
.is_some_and(|d| d.contains("timeout") || d.contains("connect failed")),
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deploy_outcome_smoke_field_is_none_for_no_smoke_deploy() {
|
||||
let outcome = deploy(CountingScore::new(), &Inventory::empty(), &NoopTopology)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(outcome.smoke.is_none());
|
||||
}
|
||||
}
|
||||
@@ -1,82 +0,0 @@
|
||||
//! The Score → SmokeTest pairing contract.
|
||||
//!
|
||||
//! [`SmokeTest`] is the companion trait that pairs a [`Score`] with
|
||||
//! the probes that verify *that specific* Score's deployment worked.
|
||||
//! The pairing is type-level via the `Score` associated type — the
|
||||
//! compiler refuses to call
|
||||
//! [`deploy_with_smoke`](super::deploy::deploy_with_smoke) with a
|
||||
//! `FleetOperatorSmokeTest` for a `DnsScore`, because their
|
||||
//! associated types don't match. This is the same trick that makes
|
||||
//! `K8sBareTopology: PostgreSQL` an unsatisfied trait bound when you
|
||||
//! accidentally try to run PostgreSQL on a topology that doesn't
|
||||
//! advertise it — see ADR-024 §2 and JG's *For the Love of
|
||||
//! Compilers* talk.
|
||||
//!
|
||||
//! ## Why not a method on `Score`?
|
||||
//!
|
||||
//! Per ADR-023 P7, framework capabilities attach as companions, not
|
||||
//! as new methods on `Score`/`Interpret`. The base trait surface
|
||||
//! stays one method (`create_interpret`). Smoke is *something paired
|
||||
//! with* a Score, not *something on* a Score. The orthogonal
|
||||
//! benefit: a Score can be deployed without smoke, can be deployed
|
||||
//! with one of several smoke configurations (dev, prod, e2e), or
|
||||
//! can have its smoke evolved independently of its interpret logic.
|
||||
//!
|
||||
//! ## Why `assemble` returns a value, not a future-of-checks?
|
||||
//!
|
||||
//! The probes a smoke test wants to run are usually known up front
|
||||
//! from the Score's data — "the service name we just deployed,
|
||||
//! check it on the cluster's default DNS". Resolving that into a
|
||||
//! concrete `SmokeSuite` *before* the probes run means the dashboard
|
||||
//! can render the pipeline (probe names, stage count) the moment
|
||||
//! smoke starts. If it returned a `Stream<Item = ProbeReport>`, the
|
||||
//! pipeline shape wouldn't be knowable until the run completed.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use thiserror::Error;
|
||||
|
||||
use harmony::score::Score;
|
||||
use harmony::topology::Topology;
|
||||
|
||||
use super::suite::SmokeSuite;
|
||||
|
||||
/// Pair a [`Score`] with the [`SmokeSuite`] that proves its deploy
|
||||
/// converged.
|
||||
///
|
||||
/// Implementors live next to the Score they pair with — the
|
||||
/// `FleetOperatorSmokeTest` ships from this crate alongside
|
||||
/// `FleetOperatorScore`, the same way [`crate::AgentObservation`]
|
||||
/// lives alongside `FleetAgentScore`. This is the companion
|
||||
/// placement rule from ADR-023 §"Extend Scores with companions".
|
||||
#[async_trait]
|
||||
pub trait SmokeTest<T: Topology>: Send + Sync {
|
||||
/// The Score this smoke test verifies. The type lock means
|
||||
/// `SM::Score = S` is enforced at every call site.
|
||||
type Score: Score<T>;
|
||||
|
||||
/// Build the [`SmokeSuite`] that will run after the Score's
|
||||
/// `interpret` succeeds. Has access to the Score so probes can
|
||||
/// be parameterized on the deploy's actual inputs (service
|
||||
/// names, namespaces, ports), and to the topology so probes can
|
||||
/// pull credentials / endpoints from declared capabilities.
|
||||
async fn assemble(
|
||||
&self,
|
||||
score: &Self::Score,
|
||||
topology: &T,
|
||||
) -> Result<SmokeSuite<T>, SmokeAssemblyError>;
|
||||
}
|
||||
|
||||
/// Why a smoke test failed to *build* (before any probe runs).
|
||||
///
|
||||
/// Distinct from probe failure — assembly failure means the Score
|
||||
/// is in an unexpected shape (missing endpoint, can't resolve a
|
||||
/// reference). The dashboard shows this differently from a probe
|
||||
/// failure: it's an authoring / configuration error, not a deployed-
|
||||
/// system error.
|
||||
#[derive(Debug, Clone, Error, PartialEq, Eq)]
|
||||
pub enum SmokeAssemblyError {
|
||||
#[error("smoke test could not derive endpoint for {what}: {detail}")]
|
||||
MissingEndpoint { what: String, detail: String },
|
||||
#[error("smoke test assembly failed: {0}")]
|
||||
Other(String),
|
||||
}
|
||||
@@ -1,387 +0,0 @@
|
||||
//! `deploy` and `deploy_with_smoke` — the framework-glue wrappers
|
||||
//! that bind a [`Score`]'s interpret to an optional [`SmokeTest`].
|
||||
//!
|
||||
//! These are free functions, not methods on [`Score`]/[`Maestro`]
|
||||
//! /[`Interpret`]. The framework's domain directory is left
|
||||
//! untouched in this PR (ADR-023 P7 — "extend Scores with companions,
|
||||
//! not API changes"). If a later PR proves this shape, promoting
|
||||
//! `deploy_with_smoke` to a `Maestro` convenience is a one-line
|
||||
//! additive change.
|
||||
//!
|
||||
//! ## Opting in or out
|
||||
//!
|
||||
//! Two entry points, one decision at the call site:
|
||||
//!
|
||||
//! - [`deploy`] — run the Score's interpret, no smoke.
|
||||
//! - [`deploy_with_smoke`] — run the Score's interpret, then the
|
||||
//! paired [`SmokeTest`]. Returns only after the smoke run
|
||||
//! completes; a failed smoke is a deploy error per the project
|
||||
//! rule "deploy blocks on smoke-test".
|
||||
//!
|
||||
//! Callers (CLI binaries, the e2e harness, the operator's
|
||||
//! reconciler) choose which to call. There is no global config knob
|
||||
//! and no per-Score override — the choice is one function call.
|
||||
|
||||
use harmony::interpret::{InterpretError, Outcome};
|
||||
use harmony::inventory::Inventory;
|
||||
use harmony::score::Score;
|
||||
use harmony::topology::Topology;
|
||||
use thiserror::Error;
|
||||
use tracing::{info, info_span};
|
||||
|
||||
use super::contract::{SmokeAssemblyError, SmokeTest};
|
||||
use super::suite::SmokeReport;
|
||||
|
||||
/// What [`deploy`]/[`deploy_with_smoke`] return on success.
|
||||
///
|
||||
/// `interpret` is the underlying `Score::interpret` result —
|
||||
/// preserved verbatim so callers that previously consumed `Outcome`
|
||||
/// directly continue to work after switching to these wrappers.
|
||||
/// `smoke` is `None` when the no-smoke variant was used; `Some`
|
||||
/// otherwise, with `passed() == true` guaranteed (a failed smoke is
|
||||
/// a [`DeployError::SmokeFailed`]).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DeployOutcome {
|
||||
pub interpret: Outcome,
|
||||
pub smoke: Option<SmokeReport>,
|
||||
}
|
||||
|
||||
/// All the ways a deploy can fail. Three arms, each pointing at a
|
||||
/// concrete cause the operator can act on:
|
||||
///
|
||||
/// - `Interpret` — the Score's own work failed (helm error, kube
|
||||
/// apply error, etc). Same shape callers got before smoke existed.
|
||||
/// - `SmokeAssembly` — the Score deployed, but its `SmokeTest`
|
||||
/// couldn't even *build* a suite. Typically means the test's
|
||||
/// author is missing an endpoint reference; an authoring bug, not
|
||||
/// a runtime bug.
|
||||
/// - `SmokeFailed` — the Score deployed and the suite built, but
|
||||
/// one or more probes failed. The full [`SmokeReport`] is
|
||||
/// attached so the dashboard can render which stages passed and
|
||||
/// which didn't, instead of a single opaque "deploy failed".
|
||||
#[derive(Debug, Error)]
|
||||
pub enum DeployError {
|
||||
#[error("score interpret failed: {0}")]
|
||||
Interpret(#[from] InterpretError),
|
||||
#[error("smoke assembly failed: {0}")]
|
||||
SmokeAssembly(#[from] SmokeAssemblyError),
|
||||
#[error("smoke test failed ({} of {} probe(s) did not pass)", .report.failed().count(), .report.probes.len())]
|
||||
SmokeFailed {
|
||||
/// The Score deployed cleanly — `interpret` is preserved so
|
||||
/// callers (and the dashboard) can render the underlying
|
||||
/// success message alongside the smoke failure.
|
||||
interpret: Outcome,
|
||||
report: SmokeReport,
|
||||
},
|
||||
}
|
||||
|
||||
/// Deploy a Score without running any smoke checks.
|
||||
///
|
||||
/// Returns immediately after `score.interpret` returns. Equivalent
|
||||
/// to calling `score.interpret(...)` directly — provided here so
|
||||
/// callers can switch between smoke / no-smoke without changing
|
||||
/// the surrounding control flow.
|
||||
pub async fn deploy<T, S>(
|
||||
score: S,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<DeployOutcome, DeployError>
|
||||
where
|
||||
T: Topology,
|
||||
S: Score<T>,
|
||||
{
|
||||
let span = info_span!("deploy", score = score.name());
|
||||
let _g = span.enter();
|
||||
info!("deploy: starting interpret (no smoke)");
|
||||
let interpret = score.interpret(inventory, topology).await?;
|
||||
info!(status = %interpret.status, "deploy: interpret returned");
|
||||
Ok(DeployOutcome {
|
||||
interpret,
|
||||
smoke: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Deploy a Score, then run its paired SmokeTest. Block on the
|
||||
/// suite — see project rule "deploy blocks on smoke-test".
|
||||
///
|
||||
/// The type bound `SM: SmokeTest<T, Score = S>` is where the
|
||||
/// compile-time pairing lives: the `SmokeTest` you pass must
|
||||
/// declare *this* Score type as its associated `Score`. A mismatch
|
||||
/// is rejected at the call site, not at runtime. (See ADR-024 §2
|
||||
/// and JG's compile-time-feedback principle.)
|
||||
pub async fn deploy_with_smoke<T, S, SM>(
|
||||
score: S,
|
||||
smoke: SM,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<DeployOutcome, DeployError>
|
||||
where
|
||||
T: Topology,
|
||||
S: Score<T>,
|
||||
SM: SmokeTest<T, Score = S>,
|
||||
{
|
||||
let span = info_span!("deploy", score = score.name(), smoke = true);
|
||||
let _g = span.enter();
|
||||
|
||||
info!("deploy: starting interpret");
|
||||
let interpret = score.interpret(inventory, topology).await?;
|
||||
info!(status = %interpret.status, "deploy: interpret returned, assembling smoke");
|
||||
|
||||
let suite = smoke.assemble(&score, topology).await?;
|
||||
info!(stages = suite.len(), "deploy: smoke suite assembled");
|
||||
|
||||
let report = suite.run(topology).await;
|
||||
info!(
|
||||
passed = report.passed(),
|
||||
elapsed_ms = report.elapsed.as_millis() as u64,
|
||||
"deploy: smoke finished",
|
||||
);
|
||||
|
||||
if !report.passed() {
|
||||
return Err(DeployError::SmokeFailed { interpret, report });
|
||||
}
|
||||
|
||||
Ok(DeployOutcome {
|
||||
interpret,
|
||||
smoke: Some(report),
|
||||
})
|
||||
}
|
||||
|
||||
// ---- tests -----------------------------------------------------------------
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::companion::smoke::probe::{Probe, ProbeAttempt, ProbeName, RetryPolicy};
|
||||
use crate::companion::smoke::suite::SmokeSuite;
|
||||
use async_trait::async_trait;
|
||||
use harmony::data::Version;
|
||||
use harmony::interpret::{Interpret, InterpretName, InterpretStatus};
|
||||
use harmony::topology::PreparationOutcome;
|
||||
use harmony_types::id::Id;
|
||||
use serde::Serialize;
|
||||
use std::sync::Mutex;
|
||||
|
||||
// -- test fixtures: a NoopTopology + minimal Score + minimal Probe --------
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NoopTopology;
|
||||
|
||||
#[async_trait]
|
||||
impl Topology for NoopTopology {
|
||||
fn name(&self) -> &str {
|
||||
"noop"
|
||||
}
|
||||
async fn ensure_ready(
|
||||
&self,
|
||||
) -> Result<PreparationOutcome, harmony::topology::PreparationError> {
|
||||
Ok(PreparationOutcome::Noop)
|
||||
}
|
||||
}
|
||||
|
||||
/// Minimal Score whose interpret can be scripted to succeed or
|
||||
/// fail. Demonstrates that `deploy_with_smoke` short-circuits
|
||||
/// correctly when interpret fails.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
struct TrivialScore {
|
||||
label: String,
|
||||
should_fail: bool,
|
||||
}
|
||||
|
||||
impl<T: Topology> Score<T> for TrivialScore {
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(TrivialInterpret {
|
||||
should_fail: self.should_fail,
|
||||
label: self.label.clone(),
|
||||
})
|
||||
}
|
||||
fn name(&self) -> String {
|
||||
format!("TrivialScore({})", self.label)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TrivialInterpret {
|
||||
should_fail: bool,
|
||||
label: String,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology> Interpret<T> for TrivialInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
_inventory: &Inventory,
|
||||
_topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
if self.should_fail {
|
||||
Err(InterpretError::new(format!(
|
||||
"scripted failure: {}",
|
||||
self.label
|
||||
)))
|
||||
} else {
|
||||
Ok(Outcome::success(format!("deployed {}", self.label)))
|
||||
}
|
||||
}
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("TrivialInterpret")
|
||||
}
|
||||
fn get_version(&self) -> Version {
|
||||
Version::from("0.0.0").unwrap()
|
||||
}
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
InterpretStatus::QUEUED
|
||||
}
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
|
||||
/// A probe whose every attempt is hardcoded. Lets us script
|
||||
/// "this stage passes" or "this stage rejects" in tests.
|
||||
#[derive(Debug)]
|
||||
struct Always {
|
||||
name: ProbeName,
|
||||
attempt: ProbeAttempt,
|
||||
}
|
||||
impl Always {
|
||||
fn new(name: &str, attempt: ProbeAttempt) -> Self {
|
||||
Self {
|
||||
name: ProbeName::try_new(name).unwrap(),
|
||||
attempt,
|
||||
}
|
||||
}
|
||||
}
|
||||
#[async_trait]
|
||||
impl<T: Topology> Probe<T> for Always {
|
||||
fn name(&self) -> &ProbeName {
|
||||
&self.name
|
||||
}
|
||||
async fn check(&self, _t: &T) -> ProbeAttempt {
|
||||
self.attempt.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// The shape we expect every fleet smoke test to follow: it
|
||||
/// owns no state beyond what the Score gives it.
|
||||
#[derive(Debug)]
|
||||
struct TrivialSmoke {
|
||||
outcomes: Mutex<Vec<ProbeAttempt>>,
|
||||
}
|
||||
impl TrivialSmoke {
|
||||
fn new(outcomes: Vec<ProbeAttempt>) -> Self {
|
||||
Self {
|
||||
outcomes: Mutex::new(outcomes),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology> SmokeTest<T> for TrivialSmoke {
|
||||
type Score = TrivialScore;
|
||||
async fn assemble(
|
||||
&self,
|
||||
score: &Self::Score,
|
||||
_topology: &T,
|
||||
) -> Result<SmokeSuite<T>, SmokeAssemblyError> {
|
||||
let mut suite = SmokeSuite::new();
|
||||
for (i, attempt) in self.outcomes.lock().unwrap().iter().enumerate() {
|
||||
let name = format!("{}-probe-{i}", score.label);
|
||||
suite = suite.stage(Always::new(&name, attempt.clone()), RetryPolicy::once());
|
||||
}
|
||||
Ok(suite)
|
||||
}
|
||||
}
|
||||
|
||||
// -- the actual contract tests --------------------------------------------
|
||||
|
||||
fn inv() -> Inventory {
|
||||
Inventory::empty()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn deploy_without_smoke_returns_outcome_and_no_report() {
|
||||
let s = TrivialScore {
|
||||
label: "alpha".to_string(),
|
||||
should_fail: false,
|
||||
};
|
||||
let r = deploy(s, &inv(), &NoopTopology).await.expect("deploy ok");
|
||||
assert_eq!(r.interpret.status, InterpretStatus::SUCCESS);
|
||||
assert!(
|
||||
r.smoke.is_none(),
|
||||
"no-smoke variant must not synthesize a report"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn interpret_failure_short_circuits_before_smoke_assembly() {
|
||||
// If interpret fails, we MUST NOT call `smoke.assemble` —
|
||||
// there is nothing deployed to verify, and running smoke
|
||||
// would produce confusing cascade failures in the report.
|
||||
let s = TrivialScore {
|
||||
label: "beta".to_string(),
|
||||
should_fail: true,
|
||||
};
|
||||
// Use a smoke that would panic if assembled — proves we
|
||||
// never got there.
|
||||
struct PanicSmoke;
|
||||
#[async_trait]
|
||||
impl<T: Topology> SmokeTest<T> for PanicSmoke {
|
||||
type Score = TrivialScore;
|
||||
async fn assemble(
|
||||
&self,
|
||||
_: &Self::Score,
|
||||
_: &T,
|
||||
) -> Result<SmokeSuite<T>, SmokeAssemblyError> {
|
||||
panic!("assemble must not be called when interpret fails");
|
||||
}
|
||||
}
|
||||
match deploy_with_smoke(s, PanicSmoke, &inv(), &NoopTopology).await {
|
||||
Err(DeployError::Interpret(e)) => {
|
||||
assert!(format!("{e}").contains("scripted failure"));
|
||||
}
|
||||
other => panic!("expected Interpret error, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn all_probes_pass_yields_deployoutcome_with_report() {
|
||||
let s = TrivialScore {
|
||||
label: "gamma".to_string(),
|
||||
should_fail: false,
|
||||
};
|
||||
let smoke = TrivialSmoke::new(vec![
|
||||
ProbeAttempt::Ok(Some("first stage ok".to_string())),
|
||||
ProbeAttempt::Ok(None),
|
||||
]);
|
||||
let r = deploy_with_smoke(s, smoke, &inv(), &NoopTopology)
|
||||
.await
|
||||
.expect("deploy + smoke ok");
|
||||
let report = r.smoke.expect("smoke report present when smoke runs");
|
||||
assert!(report.passed());
|
||||
assert_eq!(report.probes.len(), 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn probe_failure_blocks_deploy() {
|
||||
let s = TrivialScore {
|
||||
label: "delta".to_string(),
|
||||
should_fail: false,
|
||||
};
|
||||
let smoke = TrivialSmoke::new(vec![
|
||||
ProbeAttempt::Ok(None),
|
||||
ProbeAttempt::Fatal("definitely broken".to_string()),
|
||||
]);
|
||||
match deploy_with_smoke(s, smoke, &inv(), &NoopTopology).await {
|
||||
Err(DeployError::SmokeFailed { interpret, report }) => {
|
||||
// The Score itself succeeded — that's preserved, so
|
||||
// the dashboard can show "deploy converged, smoke
|
||||
// disagrees".
|
||||
assert_eq!(interpret.status, InterpretStatus::SUCCESS);
|
||||
assert!(!report.passed());
|
||||
assert_eq!(report.failed().count(), 1);
|
||||
assert_eq!(report.probes.last().unwrap().name.as_str(), "delta-probe-1");
|
||||
}
|
||||
other => panic!("expected SmokeFailed, got {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,62 +0,0 @@
|
||||
//! Smoke-test contract — the second Score companion (ADR-023 P7,
|
||||
//! ADR-023 P4 "deploy returns only after smoke-test success").
|
||||
//!
|
||||
//! ## The big picture
|
||||
//!
|
||||
//! ```text
|
||||
//! ┌────────────┐ pair ┌──────────────┐ assemble ┌────────────┐
|
||||
//! │ *Score │◀─type─────│ SmokeTest │─────────────▶│ SmokeSuite │
|
||||
//! └────────────┘ locked └──────────────┘ └────────────┘
|
||||
//! ▲ │
|
||||
//! │ │
|
||||
//! deploy_with_smoke ─── interpret ─── then ── run probes ──▶ SmokeReport
|
||||
//! ```
|
||||
//!
|
||||
//! - [`Probe`] is the single-attempt unit, classifying its
|
||||
//! observation as [`Ok`/`Retry`/`Fatal`](probe::ProbeAttempt). It's
|
||||
//! also the stage the dashboard renders.
|
||||
//! - [`SmokeSuite`] is an ordered sequence of probes with per-stage
|
||||
//! [`RetryPolicy`]. Sequential by design; parallel stages deferred.
|
||||
//! - [`SmokeTest`] pairs a [`Score`](harmony::score::Score) with a
|
||||
//! suite by associated type — a `SmokeTest<Score = FleetOperatorScore>`
|
||||
//! cannot be passed to `deploy_with_smoke(DnsScore, …)` because the
|
||||
//! compiler refuses the mismatch.
|
||||
//! - [`deploy`] / [`deploy_with_smoke`] are free functions that
|
||||
//! bind a Score's interpret to an optional SmokeTest. Returns
|
||||
//! only after the smoke run completes — see
|
||||
//! [`DeployError::SmokeFailed`] for the failure shape.
|
||||
//!
|
||||
//! ## Where this lives
|
||||
//!
|
||||
//! Inside `harmony-fleet-deploy` for Phase 0, alongside the
|
||||
//! existing [`AgentObservation`](super::AgentObservation) companion.
|
||||
//! When a second consumer (PostgreSQL deploy, monitoring stack)
|
||||
//! adopts this contract, the module promotes to a top-level
|
||||
//! `harmony-smoke` crate with no API churn — the move is mechanical
|
||||
//! because nothing in here depends on fleet types.
|
||||
//!
|
||||
//! ## What this does NOT do
|
||||
//!
|
||||
//! - **No new `HarmonyEvent` variants in Phase 0.** Stage progress
|
||||
//! is emitted via `tracing::info!` events inside `info_span!`s.
|
||||
//! A later PR can add `HarmonyEvent::SmokeStage{Started,Finished}`
|
||||
//! if the dashboard wants the typed seam.
|
||||
//! - **No `Score`/`Interpret` trait edits.** Per ADR-023 P7. The
|
||||
//! smoke seam is purely companion-shaped.
|
||||
//! - **No CLI flag yet.** Phase 1 wires `harmony-fleet-deploy <cmd>
|
||||
//! --no-smoke` once a real `FleetOperatorSmokeTest` exists.
|
||||
|
||||
pub mod contract;
|
||||
pub mod deploy;
|
||||
pub mod probe;
|
||||
pub mod probes;
|
||||
pub mod suite;
|
||||
|
||||
pub use contract::{SmokeAssemblyError, SmokeTest};
|
||||
pub use deploy::{DeployError, DeployOutcome, deploy, deploy_with_smoke};
|
||||
pub use probe::{
|
||||
InvalidProbeName, Probe, ProbeAttempt, ProbeFailure, ProbeName, ProbeOutcome, RetryPolicy,
|
||||
run_probe,
|
||||
};
|
||||
pub use probes::TcpReachable;
|
||||
pub use suite::{ProbeReport, SmokeReport, SmokeSuite};
|
||||
@@ -1,515 +0,0 @@
|
||||
//! Probe contract — the single-attempt unit of a smoke run.
|
||||
//!
|
||||
//! A [`Probe`] is what gets visualized as one stage of the smoke
|
||||
//! pipeline. Each implementation answers one question against a
|
||||
//! deployed system: "is this port reachable", "did this pod become
|
||||
//! ready", "is this KV key set to the value we expect".
|
||||
//!
|
||||
//! The retry loop is **not** a probe responsibility. A probe's
|
||||
//! [`check`](Probe::check) returns one of three classifications via
|
||||
//! [`ProbeAttempt`] — `Ok`, `Retry`, or `Fatal` — and [`run_probe`]
|
||||
//! drives the polling timer + timeout budget around it. This keeps
|
||||
//! probes single-responsibility and makes their unit tests
|
||||
//! deterministic.
|
||||
//!
|
||||
//! Each public type here is cardinality-matched to the concept it
|
||||
//! represents (see ADR-024 §2 and JG's *For the Love of Compilers*
|
||||
//! talk). `ProbeName` is a validated newtype, not `String`, because
|
||||
//! "an empty probe name" or "a probe name with a newline in it"
|
||||
//! cannot mean anything useful and would corrupt the dashboard
|
||||
//! rendering. `ProbeAttempt` is a three-arm sum, not `Result<(), Error>`,
|
||||
//! because "not ready yet" and "definitively no" must drive different
|
||||
//! orchestration paths.
|
||||
|
||||
use std::fmt;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use thiserror::Error;
|
||||
use tracing::{debug, warn};
|
||||
|
||||
use harmony::topology::Topology;
|
||||
|
||||
/// One attempted check by a probe.
|
||||
///
|
||||
/// The probe classifies its own observation rather than letting the
|
||||
/// orchestrator guess: a 5xx response is fundamentally different from
|
||||
/// a connection refused, even if both look like "failure" to a naive
|
||||
/// caller. The orchestrator ([`run_probe`]) uses this classification
|
||||
/// to decide whether to keep polling or bail out.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ProbeAttempt {
|
||||
/// The probe's invariant holds *right now*. Optional detail is
|
||||
/// surfaced in the smoke report (e.g. "HTTP 200 in 12 ms",
|
||||
/// "found 3 ready pods").
|
||||
Ok(Option<String>),
|
||||
/// The probe didn't observe what it's looking for yet, but the
|
||||
/// state is consistent with eventual success. Keep polling until
|
||||
/// the timeout. The string is the most recent observation, kept
|
||||
/// so that a final timeout can quote the last thing we saw.
|
||||
Retry(String),
|
||||
/// The probe got a definitive negative answer — no amount of
|
||||
/// further polling will change it. Skip the rest of the budget
|
||||
/// and report failure now. Example: HTTP 404 from a URL that
|
||||
/// must exist after deploy, or a `Service` with the wrong
|
||||
/// selector.
|
||||
Fatal(String),
|
||||
}
|
||||
|
||||
/// Result of a full probe run after [`run_probe`] has consumed the
|
||||
/// retry budget (or short-circuited).
|
||||
///
|
||||
/// This is the per-probe shape that lands in
|
||||
/// [`crate::companion::smoke::SmokeReport`]. The orchestrator wraps
|
||||
/// it with the [`ProbeName`] and the elapsed time so the dashboard
|
||||
/// can render each stage independently.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub enum ProbeOutcome {
|
||||
/// At least one attempt returned [`ProbeAttempt::Ok`] within the
|
||||
/// retry budget.
|
||||
Pass {
|
||||
elapsed: Duration,
|
||||
attempts: u32,
|
||||
detail: Option<String>,
|
||||
},
|
||||
/// Either we exhausted the budget with only `Retry`s (timeout)
|
||||
/// or a single `Fatal` short-circuited us (rejected).
|
||||
Fail {
|
||||
elapsed: Duration,
|
||||
attempts: u32,
|
||||
failure: ProbeFailure,
|
||||
},
|
||||
}
|
||||
|
||||
impl ProbeOutcome {
|
||||
pub fn passed(&self) -> bool {
|
||||
matches!(self, Self::Pass { .. })
|
||||
}
|
||||
}
|
||||
|
||||
/// Why a probe failed. Two arms, deliberately — they map to two
|
||||
/// different operator actions: "I waited and nothing happened" needs
|
||||
/// a longer budget or a stuck-component investigation; "something
|
||||
/// gave me a hard no" needs a configuration fix.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Error)]
|
||||
pub enum ProbeFailure {
|
||||
#[error("timed out after {budget:?} (last observation: {last_observation})")]
|
||||
Timeout {
|
||||
budget: Duration,
|
||||
last_observation: String,
|
||||
},
|
||||
#[error("rejected: {detail}")]
|
||||
Rejected { detail: String },
|
||||
}
|
||||
|
||||
/// Polling parameters for one probe.
|
||||
///
|
||||
/// `interval` is the wait between attempts. `timeout` is the total
|
||||
/// wall-clock budget from the first attempt to the last; the loop
|
||||
/// will not start a new attempt past this budget but will let an
|
||||
/// in-flight one finish.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub struct RetryPolicy {
|
||||
interval: Duration,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl RetryPolicy {
|
||||
/// Standard polling shape: try, wait `interval`, try again, until
|
||||
/// either an `Ok`/`Fatal` happens or `timeout` elapses.
|
||||
///
|
||||
/// Panics if `interval` is zero — a zero interval combined with
|
||||
/// any non-trivial check would spin the executor and produce no
|
||||
/// useful retry semantics. Express "run once" via [`Self::once`]
|
||||
/// instead.
|
||||
pub fn polling(interval: Duration, timeout: Duration) -> Self {
|
||||
assert!(
|
||||
!interval.is_zero(),
|
||||
"RetryPolicy::polling requires a non-zero interval; use RetryPolicy::once for a single attempt",
|
||||
);
|
||||
Self { interval, timeout }
|
||||
}
|
||||
|
||||
/// Single-shot: one attempt, no retries. Useful for probes whose
|
||||
/// success criterion is binary the moment we check (e.g. "is
|
||||
/// this string equal to that string").
|
||||
pub fn once() -> Self {
|
||||
Self {
|
||||
interval: Duration::from_millis(1),
|
||||
timeout: Duration::ZERO,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn interval(&self) -> Duration {
|
||||
self.interval
|
||||
}
|
||||
|
||||
pub fn timeout(&self) -> Duration {
|
||||
self.timeout
|
||||
}
|
||||
}
|
||||
|
||||
/// A validated probe name.
|
||||
///
|
||||
/// Probe names appear in `tracing` events, error messages, the smoke
|
||||
/// report, and the eventual dashboard. Constraining them at
|
||||
/// construction means the dashboard never has to defend against
|
||||
/// surprising bytes (control chars, newlines, etc).
|
||||
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub struct ProbeName(String);
|
||||
|
||||
#[derive(Debug, Error, PartialEq, Eq)]
|
||||
pub enum InvalidProbeName {
|
||||
#[error("probe name must not be empty")]
|
||||
Empty,
|
||||
#[error("probe name must not exceed 128 bytes (got {0})")]
|
||||
TooLong(usize),
|
||||
#[error("probe name must not contain control characters")]
|
||||
ControlChar,
|
||||
}
|
||||
|
||||
impl ProbeName {
|
||||
pub fn try_new(s: impl Into<String>) -> Result<Self, InvalidProbeName> {
|
||||
let s = s.into();
|
||||
if s.is_empty() {
|
||||
return Err(InvalidProbeName::Empty);
|
||||
}
|
||||
if s.len() > 128 {
|
||||
return Err(InvalidProbeName::TooLong(s.len()));
|
||||
}
|
||||
if s.chars().any(|c| c.is_control()) {
|
||||
return Err(InvalidProbeName::ControlChar);
|
||||
}
|
||||
Ok(Self(s))
|
||||
}
|
||||
|
||||
pub fn as_str(&self) -> &str {
|
||||
&self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ProbeName {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str(&self.0)
|
||||
}
|
||||
}
|
||||
|
||||
/// One pipeline stage of a [`SmokeSuite`](super::SmokeSuite).
|
||||
///
|
||||
/// Object-safe so a suite can hold a heterogeneous `Vec<Box<dyn
|
||||
/// Probe<T>>>`. The associated retry behavior is supplied separately
|
||||
/// by the suite (via [`RetryPolicy`]) so that the same probe type
|
||||
/// can be reused with different budgets in different suites.
|
||||
#[async_trait]
|
||||
pub trait Probe<T: Topology>: Send + Sync + std::fmt::Debug {
|
||||
fn name(&self) -> &ProbeName;
|
||||
async fn check(&self, topology: &T) -> ProbeAttempt;
|
||||
}
|
||||
|
||||
/// Drive a probe through its retry policy and return the consolidated
|
||||
/// outcome.
|
||||
///
|
||||
/// Behavior:
|
||||
///
|
||||
/// - Every `interval` we run `probe.check(...)`. A successful attempt
|
||||
/// wins immediately; a `Fatal` short-circuits to `Rejected`; a
|
||||
/// `Retry` records its observation and we wait.
|
||||
/// - If `timeout` is `Duration::ZERO` we run exactly one attempt
|
||||
/// (the "once" shape).
|
||||
/// - Otherwise we keep attempting until the elapsed wall time
|
||||
/// exceeds `timeout`. The last `Retry` observation becomes the
|
||||
/// `Timeout::last_observation` field so the failure message tells
|
||||
/// the operator what we kept seeing.
|
||||
///
|
||||
/// The function emits a `tracing` event per attempt at `debug` level
|
||||
/// and one at `warn` for the final failure (so a Phase-1 dashboard
|
||||
/// can subscribe via a `tracing` layer without us needing to add a
|
||||
/// new `HarmonyEvent` variant yet).
|
||||
pub async fn run_probe<T, P>(probe: &P, topology: &T, policy: RetryPolicy) -> ProbeOutcome
|
||||
where
|
||||
T: Topology,
|
||||
P: Probe<T> + ?Sized,
|
||||
{
|
||||
let start = Instant::now();
|
||||
let mut attempts: u32 = 0;
|
||||
|
||||
loop {
|
||||
attempts += 1;
|
||||
// Each iteration produces either an early return (Ok / Fatal)
|
||||
// or the most recent Retry observation. Threading that
|
||||
// observation back into the next iteration only happens
|
||||
// implicitly via the timeout check below, so there's no
|
||||
// long-lived `last_observation` to lint over.
|
||||
let last_observation = match probe.check(topology).await {
|
||||
ProbeAttempt::Ok(detail) => {
|
||||
debug!(
|
||||
probe = %probe.name(),
|
||||
attempts,
|
||||
elapsed_ms = start.elapsed().as_millis() as u64,
|
||||
"probe passed",
|
||||
);
|
||||
return ProbeOutcome::Pass {
|
||||
elapsed: start.elapsed(),
|
||||
attempts,
|
||||
detail,
|
||||
};
|
||||
}
|
||||
ProbeAttempt::Fatal(detail) => {
|
||||
warn!(
|
||||
probe = %probe.name(),
|
||||
attempts,
|
||||
%detail,
|
||||
"probe rejected (fatal)",
|
||||
);
|
||||
return ProbeOutcome::Fail {
|
||||
elapsed: start.elapsed(),
|
||||
attempts,
|
||||
failure: ProbeFailure::Rejected { detail },
|
||||
};
|
||||
}
|
||||
ProbeAttempt::Retry(obs) => {
|
||||
debug!(
|
||||
probe = %probe.name(),
|
||||
attempts,
|
||||
observation = %obs,
|
||||
"probe retry",
|
||||
);
|
||||
obs
|
||||
}
|
||||
};
|
||||
|
||||
if start.elapsed() >= policy.timeout {
|
||||
warn!(
|
||||
probe = %probe.name(),
|
||||
attempts,
|
||||
budget_ms = policy.timeout.as_millis() as u64,
|
||||
last_observation = %last_observation,
|
||||
"probe timed out",
|
||||
);
|
||||
return ProbeOutcome::Fail {
|
||||
elapsed: start.elapsed(),
|
||||
attempts,
|
||||
failure: ProbeFailure::Timeout {
|
||||
budget: policy.timeout,
|
||||
last_observation,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
tokio::time::sleep(policy.interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use harmony::topology::PreparationOutcome;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
|
||||
// -- ProbeName -----------------------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn probe_name_accepts_normal_strings() {
|
||||
assert!(ProbeName::try_new("nats-reachable").is_ok());
|
||||
assert!(ProbeName::try_new("HTTP / health").is_ok());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn probe_name_rejects_empty() {
|
||||
assert_eq!(ProbeName::try_new(""), Err(InvalidProbeName::Empty));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn probe_name_rejects_control_chars() {
|
||||
// Dashboard renders these as garbage — fail at construction.
|
||||
assert_eq!(
|
||||
ProbeName::try_new("bad\nname"),
|
||||
Err(InvalidProbeName::ControlChar)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn probe_name_rejects_overlong() {
|
||||
let long = "x".repeat(129);
|
||||
assert_eq!(
|
||||
ProbeName::try_new(long.clone()),
|
||||
Err(InvalidProbeName::TooLong(129))
|
||||
);
|
||||
}
|
||||
|
||||
// -- RetryPolicy ---------------------------------------------------------
|
||||
|
||||
#[test]
|
||||
fn retry_policy_once_is_single_attempt() {
|
||||
let p = RetryPolicy::once();
|
||||
assert_eq!(p.timeout(), Duration::ZERO);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic(expected = "non-zero interval")]
|
||||
fn retry_policy_polling_rejects_zero_interval() {
|
||||
// Zero interval + non-trivial check is a spin loop. The
|
||||
// type system can't catch this, so a runtime assert at
|
||||
// construction is the next best thing — fail loud, fail
|
||||
// early, not on the operator's deploy log.
|
||||
let _ = RetryPolicy::polling(Duration::ZERO, Duration::from_secs(5));
|
||||
}
|
||||
|
||||
// -- run_probe orchestration --------------------------------------------
|
||||
|
||||
/// Test probe that returns a scripted sequence of attempts, then
|
||||
/// loops forever on the last one. Lets us simulate
|
||||
/// "retry-then-ok" without real network I/O.
|
||||
#[derive(Debug)]
|
||||
struct ScriptedProbe {
|
||||
name: ProbeName,
|
||||
script: Mutex<Vec<ProbeAttempt>>,
|
||||
calls: AtomicU32,
|
||||
}
|
||||
|
||||
impl ScriptedProbe {
|
||||
fn new(name: &str, script: Vec<ProbeAttempt>) -> Self {
|
||||
Self {
|
||||
name: ProbeName::try_new(name).unwrap(),
|
||||
script: Mutex::new(script),
|
||||
calls: AtomicU32::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology> Probe<T> for ScriptedProbe {
|
||||
fn name(&self) -> &ProbeName {
|
||||
&self.name
|
||||
}
|
||||
async fn check(&self, _t: &T) -> ProbeAttempt {
|
||||
self.calls.fetch_add(1, Ordering::SeqCst);
|
||||
let mut s = self.script.lock().unwrap();
|
||||
if s.len() == 1 {
|
||||
s[0].clone()
|
||||
} else {
|
||||
s.remove(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NoopTopology;
|
||||
|
||||
#[async_trait]
|
||||
impl Topology for NoopTopology {
|
||||
fn name(&self) -> &str {
|
||||
"noop"
|
||||
}
|
||||
async fn ensure_ready(
|
||||
&self,
|
||||
) -> Result<PreparationOutcome, harmony::topology::PreparationError> {
|
||||
Ok(PreparationOutcome::Noop)
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn ok_on_first_attempt_yields_pass() {
|
||||
let probe = ScriptedProbe::new("instant", vec![ProbeAttempt::Ok(Some("hi".to_string()))]);
|
||||
let outcome = run_probe(
|
||||
&probe,
|
||||
&NoopTopology,
|
||||
RetryPolicy::polling(Duration::from_millis(1), Duration::from_millis(50)),
|
||||
)
|
||||
.await;
|
||||
match outcome {
|
||||
ProbeOutcome::Pass {
|
||||
attempts, detail, ..
|
||||
} => {
|
||||
assert_eq!(attempts, 1);
|
||||
assert_eq!(detail.as_deref(), Some("hi"));
|
||||
}
|
||||
other => panic!("expected Pass, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn retries_until_ok_within_budget() {
|
||||
let probe = ScriptedProbe::new(
|
||||
"eventual",
|
||||
vec![
|
||||
ProbeAttempt::Retry("not yet (1)".to_string()),
|
||||
ProbeAttempt::Retry("not yet (2)".to_string()),
|
||||
ProbeAttempt::Ok(None),
|
||||
],
|
||||
);
|
||||
let outcome = run_probe(
|
||||
&probe,
|
||||
&NoopTopology,
|
||||
RetryPolicy::polling(Duration::from_millis(2), Duration::from_millis(500)),
|
||||
)
|
||||
.await;
|
||||
match outcome {
|
||||
ProbeOutcome::Pass { attempts, .. } => assert_eq!(attempts, 3),
|
||||
other => panic!("expected Pass after retries, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fatal_short_circuits_without_consuming_full_budget() {
|
||||
let probe = ScriptedProbe::new(
|
||||
"definite-no",
|
||||
vec![ProbeAttempt::Fatal("wrong selector".to_string())],
|
||||
);
|
||||
let start = Instant::now();
|
||||
let outcome = run_probe(
|
||||
&probe,
|
||||
&NoopTopology,
|
||||
RetryPolicy::polling(Duration::from_millis(5), Duration::from_secs(10)),
|
||||
)
|
||||
.await;
|
||||
// The whole point of Fatal is that we don't burn the budget.
|
||||
// Generous upper bound — CI variance, not a real check.
|
||||
assert!(
|
||||
start.elapsed() < Duration::from_secs(1),
|
||||
"Fatal should return immediately, took {:?}",
|
||||
start.elapsed()
|
||||
);
|
||||
match outcome {
|
||||
ProbeOutcome::Fail {
|
||||
failure: ProbeFailure::Rejected { detail },
|
||||
..
|
||||
} => assert_eq!(detail, "wrong selector"),
|
||||
other => panic!("expected Rejected, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn always_retry_yields_timeout_with_last_observation() {
|
||||
let probe = ScriptedProbe::new(
|
||||
"never-ready",
|
||||
vec![ProbeAttempt::Retry("still pending".to_string())],
|
||||
);
|
||||
let outcome = run_probe(
|
||||
&probe,
|
||||
&NoopTopology,
|
||||
RetryPolicy::polling(Duration::from_millis(5), Duration::from_millis(40)),
|
||||
)
|
||||
.await;
|
||||
match outcome {
|
||||
ProbeOutcome::Fail {
|
||||
failure:
|
||||
ProbeFailure::Timeout {
|
||||
budget,
|
||||
last_observation,
|
||||
},
|
||||
attempts,
|
||||
..
|
||||
} => {
|
||||
assert_eq!(budget, Duration::from_millis(40));
|
||||
assert_eq!(last_observation, "still pending");
|
||||
assert!(
|
||||
attempts >= 2,
|
||||
"expected at least two attempts, got {attempts}"
|
||||
);
|
||||
}
|
||||
other => panic!("expected Timeout, got {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,15 +0,0 @@
|
||||
//! First-party [`Probe`](super::Probe) implementations.
|
||||
//!
|
||||
//! Phase 0 ships exactly one — [`TcpReachable`] — to validate the
|
||||
//! trait shape against real network I/O. Phase 1 adds HTTP, K8s
|
||||
//! pod-ready, NATS KV. Each future probe is one self-contained
|
||||
//! file under this module.
|
||||
//!
|
||||
//! Probes that need their own crate dependencies (e.g. a future
|
||||
//! `HelmReleaseHealthy` that talks to the helm client) can be
|
||||
//! gated behind a Cargo feature here without touching the core
|
||||
//! smoke types.
|
||||
|
||||
pub mod tcp;
|
||||
|
||||
pub use tcp::TcpReachable;
|
||||
@@ -1,225 +0,0 @@
|
||||
//! `TcpReachable` — the smallest useful probe: open a TCP connection
|
||||
//! to `host:port` within a timeout.
|
||||
//!
|
||||
//! Used as the first stage of practically every fleet smoke suite:
|
||||
//! "before checking the operator's `/healthz`, prove the cluster IP
|
||||
//! even routes". A connect-refused is a `Retry` (we may have caught
|
||||
//! the service mid-startup); a DNS or address-syntax error is
|
||||
//! `Fatal` (no amount of polling will fix it).
|
||||
//!
|
||||
//! Implemented with `tokio::net::TcpStream`. No new crate
|
||||
//! dependencies — `tokio` is already pulled in with the `full`
|
||||
//! feature by this crate's `Cargo.toml`.
|
||||
|
||||
use std::io;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use tokio::net::TcpStream;
|
||||
|
||||
use harmony::topology::Topology;
|
||||
|
||||
use crate::companion::smoke::probe::{Probe, ProbeAttempt, ProbeName};
|
||||
|
||||
/// Probe that connects to `host:port` and immediately drops the
|
||||
/// socket. Pass on `connect succeeded`.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct TcpReachable {
|
||||
name: ProbeName,
|
||||
address: String,
|
||||
connect_timeout: Duration,
|
||||
}
|
||||
|
||||
impl TcpReachable {
|
||||
/// Default connect timeout is 1s — short enough that a polling
|
||||
/// retry sees several attempts; long enough that a slow host
|
||||
/// doesn't false-fail. Override with
|
||||
/// [`Self::with_connect_timeout`].
|
||||
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
pub fn new(name: ProbeName, address: impl Into<String>) -> Self {
|
||||
Self {
|
||||
name,
|
||||
address: address.into(),
|
||||
connect_timeout: Self::DEFAULT_CONNECT_TIMEOUT,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_connect_timeout(mut self, t: Duration) -> Self {
|
||||
self.connect_timeout = t;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn address(&self) -> &str {
|
||||
&self.address
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology> Probe<T> for TcpReachable {
|
||||
fn name(&self) -> &ProbeName {
|
||||
&self.name
|
||||
}
|
||||
|
||||
async fn check(&self, _topology: &T) -> ProbeAttempt {
|
||||
let connect = TcpStream::connect(self.address.as_str());
|
||||
match tokio::time::timeout(self.connect_timeout, connect).await {
|
||||
Ok(Ok(_stream)) => ProbeAttempt::Ok(Some(format!("connected to {}", self.address))),
|
||||
Ok(Err(e)) => classify_connect_error(&self.address, e),
|
||||
Err(_elapsed) => ProbeAttempt::Retry(format!(
|
||||
"connect to {} timed out after {:?}",
|
||||
self.address, self.connect_timeout
|
||||
)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Most connect errors look like "not ready yet" — we want to keep
|
||||
/// polling. A small set is definitive and should short-circuit the
|
||||
/// retry budget. `InvalidInput` and `Unsupported` both mean the
|
||||
/// address is unparseable / unroutable on this host — no number of
|
||||
/// retries will fix that.
|
||||
fn classify_connect_error(address: &str, e: io::Error) -> ProbeAttempt {
|
||||
match e.kind() {
|
||||
io::ErrorKind::InvalidInput => {
|
||||
ProbeAttempt::Fatal(format!("invalid address {address}: {e}"))
|
||||
}
|
||||
io::ErrorKind::Unsupported => {
|
||||
ProbeAttempt::Fatal(format!("unsupported address {address}: {e}"))
|
||||
}
|
||||
// ConnectionRefused / ConnectionReset / TimedOut /
|
||||
// HostUnreachable / NetworkUnreachable / NotFound (rare) —
|
||||
// all consistent with "service not up yet". Keep polling.
|
||||
_ => ProbeAttempt::Retry(format!("connect to {address}: {e}")),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::companion::smoke::probe::{ProbeFailure, ProbeOutcome, RetryPolicy, run_probe};
|
||||
use async_trait::async_trait;
|
||||
use harmony::topology::PreparationOutcome;
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NoopTopology;
|
||||
|
||||
#[async_trait]
|
||||
impl Topology for NoopTopology {
|
||||
fn name(&self) -> &str {
|
||||
"noop"
|
||||
}
|
||||
async fn ensure_ready(
|
||||
&self,
|
||||
) -> Result<PreparationOutcome, harmony::topology::PreparationError> {
|
||||
Ok(PreparationOutcome::Noop)
|
||||
}
|
||||
}
|
||||
|
||||
fn name(s: &str) -> ProbeName {
|
||||
ProbeName::try_new(s).unwrap()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn passes_against_a_bound_listener() {
|
||||
// Bind to a kernel-assigned port and prove the probe sees
|
||||
// the listener. Using 127.0.0.1 keeps this hermetic — no
|
||||
// outbound network, no flakiness from name resolution.
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let probe = TcpReachable::new(name("local-listener"), addr.to_string());
|
||||
match run_probe(&probe, &NoopTopology, RetryPolicy::once()).await {
|
||||
ProbeOutcome::Pass { attempts, .. } => assert_eq!(attempts, 1),
|
||||
other => panic!("expected pass, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn retries_then_passes_when_listener_appears_late() {
|
||||
// Pick a free port, then race: probe runs immediately, then
|
||||
// we bind 30 ms later. The polling retry should catch it.
|
||||
let pre = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = pre.local_addr().unwrap();
|
||||
drop(pre); // free the port
|
||||
|
||||
let probe = TcpReachable::new(name("late-listener"), addr.to_string())
|
||||
.with_connect_timeout(Duration::from_millis(50));
|
||||
|
||||
let (probe_result, _) = tokio::join!(
|
||||
run_probe(
|
||||
&probe,
|
||||
&NoopTopology,
|
||||
RetryPolicy::polling(Duration::from_millis(10), Duration::from_secs(2)),
|
||||
),
|
||||
async {
|
||||
tokio::time::sleep(Duration::from_millis(40)).await;
|
||||
let listener = TcpListener::bind(addr).await.expect("rebind");
|
||||
// Hold the listener until the probe is satisfied.
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
drop(listener);
|
||||
}
|
||||
);
|
||||
|
||||
match probe_result {
|
||||
ProbeOutcome::Pass { attempts, .. } => {
|
||||
assert!(
|
||||
attempts >= 2,
|
||||
"expected at least 2 attempts, got {attempts}"
|
||||
);
|
||||
}
|
||||
other => panic!("expected eventual pass, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn times_out_on_unused_port() {
|
||||
// Bind+drop to confirm the port is free in the kernel's
|
||||
// sense, then probe against it with a tight budget. With
|
||||
// nothing listening, the OS returns ECONNREFUSED, which we
|
||||
// classify as Retry → budget expiry → Timeout.
|
||||
let pre = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = pre.local_addr().unwrap();
|
||||
drop(pre);
|
||||
|
||||
let probe = TcpReachable::new(name("nothing-here"), addr.to_string())
|
||||
.with_connect_timeout(Duration::from_millis(50));
|
||||
let outcome = run_probe(
|
||||
&probe,
|
||||
&NoopTopology,
|
||||
RetryPolicy::polling(Duration::from_millis(20), Duration::from_millis(100)),
|
||||
)
|
||||
.await;
|
||||
match outcome {
|
||||
ProbeOutcome::Fail {
|
||||
failure: ProbeFailure::Timeout { .. },
|
||||
..
|
||||
} => {}
|
||||
other => panic!("expected Timeout, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn fatal_on_unparseable_address() {
|
||||
// Connect to a syntactically invalid address. No amount of
|
||||
// retrying will help; we should short-circuit to Fatal.
|
||||
let probe = TcpReachable::new(name("bad-addr"), "not-an-address");
|
||||
let outcome = run_probe(
|
||||
&probe,
|
||||
&NoopTopology,
|
||||
RetryPolicy::polling(Duration::from_millis(20), Duration::from_secs(5)),
|
||||
)
|
||||
.await;
|
||||
match outcome {
|
||||
ProbeOutcome::Fail {
|
||||
failure: ProbeFailure::Rejected { detail },
|
||||
attempts,
|
||||
..
|
||||
} => {
|
||||
assert!(detail.contains("not-an-address"));
|
||||
assert_eq!(attempts, 1, "Fatal must short-circuit on attempt 1");
|
||||
}
|
||||
other => panic!("expected Rejected, got {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,287 +0,0 @@
|
||||
//! Smoke suite — an ordered sequence of [`Probe`]s with per-probe
|
||||
//! retry budgets, and the report that comes back from running it.
|
||||
//!
|
||||
//! Suites are sequential by design in Phase 0. The mental model is
|
||||
//! "first prove the basic connectivity, then prove the derived
|
||||
//! state" — which is inherently ordered. Parallel stages are a
|
||||
//! deferred extension; the public API never exposed sequentiality
|
||||
//! as a guarantee, so adding `parallel_stage` later is additive.
|
||||
//!
|
||||
//! A [`SmokeReport`] is a pure value — easy to log, easy to
|
||||
//! serialize for the dashboard, easy to assert against in tests. It
|
||||
//! deliberately mirrors the
|
||||
//! [`Outcome`](harmony::interpret::Outcome)/[`InterpretError`](harmony::interpret::InterpretError)
|
||||
//! split that the rest of the framework uses: one type per "what
|
||||
//! happened", not a Result soup.
|
||||
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use harmony::topology::Topology;
|
||||
use tracing::{info, info_span};
|
||||
|
||||
use super::probe::{Probe, ProbeName, ProbeOutcome, RetryPolicy, run_probe};
|
||||
|
||||
/// One stage in a [`SmokeSuite`]: a probe plus the retry budget that
|
||||
/// applies to *this* invocation.
|
||||
struct SmokeStage<T: Topology> {
|
||||
probe: Box<dyn Probe<T>>,
|
||||
policy: RetryPolicy,
|
||||
}
|
||||
|
||||
/// An ordered, retry-aware composition of probes for one
|
||||
/// [`SmokeTest`](super::SmokeTest).
|
||||
///
|
||||
/// Built via [`SmokeSuite::new`] + [`SmokeSuite::stage`]. Run via
|
||||
/// [`SmokeSuite::run`]. The same probe type can appear multiple
|
||||
/// times with different policies — that's the point of supplying
|
||||
/// the policy at the stage level, not on the probe itself.
|
||||
pub struct SmokeSuite<T: Topology> {
|
||||
stages: Vec<SmokeStage<T>>,
|
||||
}
|
||||
|
||||
impl<T: Topology> Default for SmokeSuite<T> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Topology> std::fmt::Debug for SmokeSuite<T> {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("SmokeSuite")
|
||||
.field(
|
||||
"stages",
|
||||
&self
|
||||
.stages
|
||||
.iter()
|
||||
.map(|s| s.probe.name().as_str())
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Topology> SmokeSuite<T> {
|
||||
pub fn new() -> Self {
|
||||
Self { stages: Vec::new() }
|
||||
}
|
||||
|
||||
/// Append a stage. Builder-style so an `assemble` method reads
|
||||
/// like a checklist:
|
||||
///
|
||||
/// ```ignore
|
||||
/// SmokeSuite::new()
|
||||
/// .stage(TcpReachable::new(name, "nats:4222"), RetryPolicy::polling(...))
|
||||
/// .stage(HttpHealthy::new(name, "/healthz"), RetryPolicy::polling(...))
|
||||
/// ```
|
||||
pub fn stage<P>(mut self, probe: P, policy: RetryPolicy) -> Self
|
||||
where
|
||||
P: Probe<T> + 'static,
|
||||
{
|
||||
self.stages.push(SmokeStage {
|
||||
probe: Box::new(probe) as Box<dyn Probe<T>>,
|
||||
policy,
|
||||
});
|
||||
self
|
||||
}
|
||||
|
||||
pub fn len(&self) -> usize {
|
||||
self.stages.len()
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.stages.is_empty()
|
||||
}
|
||||
|
||||
/// Drive every stage in declared order. Stops on the first
|
||||
/// failure — a smoke test is a chain of preconditions, so
|
||||
/// continuing past a failure rarely produces useful information
|
||||
/// and risks flooding the dashboard with cascade failures.
|
||||
pub async fn run(self, topology: &T) -> SmokeReport {
|
||||
let span = info_span!("smoke_suite", probes = self.stages.len());
|
||||
let _guard = span.enter();
|
||||
|
||||
let start = Instant::now();
|
||||
let mut probes = Vec::with_capacity(self.stages.len());
|
||||
|
||||
for stage in self.stages {
|
||||
let name = stage.probe.name().clone();
|
||||
info!(probe = %name, "smoke stage started");
|
||||
let outcome = run_probe(&*stage.probe, topology, stage.policy).await;
|
||||
let passed = outcome.passed();
|
||||
info!(
|
||||
probe = %name,
|
||||
passed,
|
||||
elapsed_ms = elapsed_ms(&outcome),
|
||||
"smoke stage finished",
|
||||
);
|
||||
probes.push(ProbeReport { name, outcome });
|
||||
if !passed {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SmokeReport {
|
||||
probes,
|
||||
elapsed: start.elapsed(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn elapsed_ms(outcome: &ProbeOutcome) -> u64 {
|
||||
let d = match outcome {
|
||||
ProbeOutcome::Pass { elapsed, .. } | ProbeOutcome::Fail { elapsed, .. } => *elapsed,
|
||||
};
|
||||
d.as_millis() as u64
|
||||
}
|
||||
|
||||
/// What one probe stage produced. The `name` is repeated here (also
|
||||
/// owned by the probe itself) so a `SmokeReport` is fully
|
||||
/// self-contained — the probe `Box` is dropped after the suite runs.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ProbeReport {
|
||||
pub name: ProbeName,
|
||||
pub outcome: ProbeOutcome,
|
||||
}
|
||||
|
||||
impl ProbeReport {
|
||||
pub fn passed(&self) -> bool {
|
||||
self.outcome.passed()
|
||||
}
|
||||
}
|
||||
|
||||
/// What [`SmokeSuite::run`] returns. The dashboard reads probes top
|
||||
/// to bottom; the orchestrator reads `passed()` to gate the deploy.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SmokeReport {
|
||||
pub probes: Vec<ProbeReport>,
|
||||
pub elapsed: Duration,
|
||||
}
|
||||
|
||||
impl SmokeReport {
|
||||
/// `true` iff every probe in the suite passed.
|
||||
///
|
||||
/// An empty suite passes — there's nothing to fail. Callers who
|
||||
/// want "a smoke test ran" semantics should check `!is_empty()`
|
||||
/// in addition.
|
||||
pub fn passed(&self) -> bool {
|
||||
self.probes.iter().all(ProbeReport::passed)
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.probes.is_empty()
|
||||
}
|
||||
|
||||
/// Iterator over only the failed probes — convenient for
|
||||
/// rendering "X failures of Y" without filtering at the call
|
||||
/// site.
|
||||
pub fn failed(&self) -> impl Iterator<Item = &ProbeReport> {
|
||||
self.probes.iter().filter(|p| !p.passed())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::companion::smoke::probe::{ProbeAttempt, ProbeFailure};
|
||||
use async_trait::async_trait;
|
||||
use harmony::topology::PreparationOutcome;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct NoopTopology;
|
||||
|
||||
#[async_trait]
|
||||
impl Topology for NoopTopology {
|
||||
fn name(&self) -> &str {
|
||||
"noop"
|
||||
}
|
||||
async fn ensure_ready(
|
||||
&self,
|
||||
) -> Result<PreparationOutcome, harmony::topology::PreparationError> {
|
||||
Ok(PreparationOutcome::Noop)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct Always {
|
||||
name: ProbeName,
|
||||
attempt: ProbeAttempt,
|
||||
}
|
||||
|
||||
impl Always {
|
||||
fn new(name: &str, attempt: ProbeAttempt) -> Self {
|
||||
Self {
|
||||
name: ProbeName::try_new(name).unwrap(),
|
||||
attempt,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology> Probe<T> for Always {
|
||||
fn name(&self) -> &ProbeName {
|
||||
&self.name
|
||||
}
|
||||
async fn check(&self, _t: &T) -> ProbeAttempt {
|
||||
self.attempt.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn empty_suite_passes_vacuously() {
|
||||
let report = SmokeSuite::<NoopTopology>::new().run(&NoopTopology).await;
|
||||
assert!(report.passed());
|
||||
assert!(report.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn all_pass_stages_aggregates_pass() {
|
||||
let report = SmokeSuite::<NoopTopology>::new()
|
||||
.stage(
|
||||
Always::new("a", ProbeAttempt::Ok(None)),
|
||||
RetryPolicy::once(),
|
||||
)
|
||||
.stage(
|
||||
Always::new("b", ProbeAttempt::Ok(Some("ok".to_string()))),
|
||||
RetryPolicy::once(),
|
||||
)
|
||||
.run(&NoopTopology)
|
||||
.await;
|
||||
assert!(report.passed());
|
||||
assert_eq!(report.probes.len(), 2);
|
||||
assert!(report.failed().count() == 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn first_failure_short_circuits_remaining_stages() {
|
||||
// The third stage must never run. We assert by name —
|
||||
// its presence in the report would prove a regression.
|
||||
let report = SmokeSuite::<NoopTopology>::new()
|
||||
.stage(
|
||||
Always::new("a", ProbeAttempt::Ok(None)),
|
||||
RetryPolicy::once(),
|
||||
)
|
||||
.stage(
|
||||
Always::new("b", ProbeAttempt::Fatal("nope".to_string())),
|
||||
RetryPolicy::once(),
|
||||
)
|
||||
.stage(
|
||||
Always::new("c", ProbeAttempt::Ok(None)),
|
||||
RetryPolicy::once(),
|
||||
)
|
||||
.run(&NoopTopology)
|
||||
.await;
|
||||
assert!(!report.passed());
|
||||
assert_eq!(report.probes.len(), 2);
|
||||
assert_eq!(report.probes[0].name.as_str(), "a");
|
||||
assert_eq!(report.probes[1].name.as_str(), "b");
|
||||
// Cascading failures are noise — confirm only the actual
|
||||
// culprit appears.
|
||||
match &report.probes[1].outcome {
|
||||
ProbeOutcome::Fail {
|
||||
failure: ProbeFailure::Rejected { detail },
|
||||
..
|
||||
} => assert_eq!(detail, "nope"),
|
||||
other => panic!("expected Rejected, got {other:?}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -25,6 +25,13 @@
|
||||
//! companion seam from P7. First-party `TcpReachable` probe ships
|
||||
//! in this PR; HTTP/K8s/NATS probes and a real
|
||||
//! `FleetOperatorSmokeTest` follow in Phase 1.
|
||||
//! - [`companion::logs`] — the log-tail contract. `LogQuery`
|
||||
//! companion trait, `LogChunk` value, `PodmanLogQuery` NATS
|
||||
//! request/reply implementation. Powers the dashboard's "View
|
||||
//! logs" UX. Trait + transport-side impl + unit tests ship in
|
||||
//! this PR; the agent-side `Verb::Logs` handler and the operator
|
||||
//! dashboard handler (`/deployments/<name>/devices/<id>/logs`)
|
||||
//! land in follow-up PRs against the contract locked here.
|
||||
//!
|
||||
//! Out of scope for now:
|
||||
//!
|
||||
@@ -42,6 +49,7 @@ pub mod server;
|
||||
|
||||
pub use agent::{FleetAgentScore, PodTarget};
|
||||
pub use companion::AgentObservation;
|
||||
pub use companion::logs;
|
||||
pub use companion::smoke;
|
||||
pub use nats::{FleetNatsScore, UserPassCredentials};
|
||||
pub use operator::{FleetOperatorScore, OperatorCredentials};
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
//! messages on the inbox (streaming verbs set the `X-Harmony-Final`
|
||||
//! header on the last frame).
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use harmony_types::id::Id;
|
||||
@@ -42,6 +43,10 @@ pub const SUBJECT_PREFIX: &str = "device-commands";
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum Verb {
|
||||
Ping,
|
||||
/// Fetch the last N lines of a deployment's container logs from
|
||||
/// the device. Single-shot reply; live tail (streaming) is a
|
||||
/// later verb.
|
||||
Logs,
|
||||
}
|
||||
|
||||
impl Verb {
|
||||
@@ -51,6 +56,7 @@ impl Verb {
|
||||
pub fn as_subject_token(&self) -> &'static str {
|
||||
match self {
|
||||
Verb::Ping => "ping",
|
||||
Verb::Logs => "logs",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -67,12 +73,14 @@ pub fn device_command_subscription(device_id: &str) -> String {
|
||||
format!("{SUBJECT_PREFIX}.{device_id}.>")
|
||||
}
|
||||
|
||||
/// JSON body of an outbound command request. v1 carries only `Ping`
|
||||
/// (which has no payload). Future verbs add their own struct + variant.
|
||||
/// JSON body of an outbound command request. v1 carried only `Ping`;
|
||||
/// `Logs` is the second verb. Future verbs add their own struct +
|
||||
/// variant.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "verb", rename_all = "lowercase")]
|
||||
pub enum CommandRequest {
|
||||
Ping,
|
||||
Logs(LogsRequest),
|
||||
}
|
||||
|
||||
/// JSON body of a `Verb::Ping` reply.
|
||||
@@ -83,6 +91,55 @@ pub struct PingReply {
|
||||
pub uptime_s: u64,
|
||||
}
|
||||
|
||||
/// JSON body of a `Verb::Logs` request.
|
||||
///
|
||||
/// The deployment lives in the body (not in the subject) so the
|
||||
/// agent's existing wildcard subscription
|
||||
/// `device-commands.<id>.>` keeps working unchanged — the verb stays
|
||||
/// the trailing token. `lines` is bounded at deserialization on the
|
||||
/// agent side; values above [`LOGS_MAX_LINES`] are clamped down so a
|
||||
/// hostile operator can't ask for an unbounded transfer.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct LogsRequest {
|
||||
/// Which deployment on the target device to read logs from.
|
||||
/// Validated by the typed [`crate::DeploymentName`] newtype.
|
||||
pub deployment: crate::DeploymentName,
|
||||
/// How many trailing lines to return. The agent clamps to
|
||||
/// [`LOGS_MAX_LINES`] before invoking the runtime.
|
||||
pub lines: u32,
|
||||
}
|
||||
|
||||
/// JSON body of a `Verb::Logs` reply. Mirrors
|
||||
/// `harmony_fleet_deploy::companion::logs::LogChunk` on the wire —
|
||||
/// kept here (not in the deploy crate) so the agent build, which
|
||||
/// must not depend on harmony, can construct and serialize it.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct LogsReply {
|
||||
/// `ProbeName`-equivalent identifier for the log stream. The deploy
|
||||
/// crate wraps this back into a validated `ProbeName` on receipt.
|
||||
/// On the wire it's a `String` so the agent crate stays free of
|
||||
/// the smoke-companion's validated-newtype surface.
|
||||
pub source: String,
|
||||
/// When the agent captured the logs. The dashboard renders this
|
||||
/// alongside the lines so the user knows the chunk is fresh
|
||||
/// (not a stale cache).
|
||||
pub captured_at: DateTime<Utc>,
|
||||
/// Tail lines, oldest-first, one container line per entry. Empty
|
||||
/// when the container has produced no output yet.
|
||||
pub lines: Vec<String>,
|
||||
/// `true` when the agent's runtime returned a stream that
|
||||
/// exceeded the per-request size budget and we cut it. Surfacing
|
||||
/// this lets the dashboard show a "showing last N of more"
|
||||
/// hint instead of silently truncating.
|
||||
pub truncated: bool,
|
||||
}
|
||||
|
||||
/// Hard upper bound on `LogsRequest::lines`. The agent clamps
|
||||
/// requests above this. 1000 lines comfortably fits in a single
|
||||
/// NATS message under the default 8 MiB max payload and matches the
|
||||
/// `--tail` budget a typical operator wants from the dashboard.
|
||||
pub const LOGS_MAX_LINES: u32 = 1000;
|
||||
|
||||
/// Stable error categories the agent reports on the reply payload
|
||||
/// when a verb can't be handled. The operator-side client maps these
|
||||
/// to its own typed error enum; everything else (no_responders,
|
||||
@@ -155,4 +212,68 @@ mod tests {
|
||||
let json = serde_json::to_string(&CommandRequest::Ping).unwrap();
|
||||
assert_eq!(json, r#"{"verb":"ping"}"#);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn logs_subject_matches_documented_format() {
|
||||
// Subject suffix is the verb token only — the deployment
|
||||
// lives in the request body. Keeps the agent's existing
|
||||
// wildcard subscription (`device-commands.<id>.>`) and
|
||||
// permission template valid without change.
|
||||
assert_eq!(
|
||||
device_command_subject("pi-42", Verb::Logs),
|
||||
"device-commands.pi-42.logs"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn logs_request_round_trips_through_json() {
|
||||
let original = LogsRequest {
|
||||
deployment: crate::DeploymentName::try_new("hello-web").expect("valid"),
|
||||
lines: 100,
|
||||
};
|
||||
let json = serde_json::to_vec(&original).unwrap();
|
||||
let back: LogsRequest = serde_json::from_slice(&json).unwrap();
|
||||
assert_eq!(back, original);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn logs_reply_round_trips_through_json() {
|
||||
let original = LogsReply {
|
||||
source: "podman:hello-web".to_string(),
|
||||
captured_at: chrono::DateTime::parse_from_rfc3339("2026-01-15T12:00:00Z")
|
||||
.unwrap()
|
||||
.with_timezone(&Utc),
|
||||
lines: vec!["first".to_string(), "second".to_string()],
|
||||
truncated: false,
|
||||
};
|
||||
let json = serde_json::to_vec(&original).unwrap();
|
||||
let back: LogsReply = serde_json::from_slice(&json).unwrap();
|
||||
assert_eq!(back, original);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn command_request_logs_tagged_form_is_stable() {
|
||||
// Locking the wire shape: an older agent that learns about
|
||||
// `Logs` must continue to recognize this exact bytes
|
||||
// sequence as a Logs request.
|
||||
let req = CommandRequest::Logs(LogsRequest {
|
||||
deployment: crate::DeploymentName::try_new("hello-web").expect("valid"),
|
||||
lines: 50,
|
||||
});
|
||||
let json = serde_json::to_string(&req).unwrap();
|
||||
assert_eq!(
|
||||
json,
|
||||
r#"{"verb":"logs","deployment":"hello-web","lines":50}"#
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn logs_max_lines_is_a_sane_budget() {
|
||||
// The constant exists; assert it's not zero (which would
|
||||
// make every legal request return no lines) and not so
|
||||
// huge that a single NATS message would exceed the default
|
||||
// server payload limit.
|
||||
assert!(LOGS_MAX_LINES > 0);
|
||||
assert!(LOGS_MAX_LINES <= 10_000);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,8 +23,8 @@ pub mod status;
|
||||
|
||||
pub use commands::{
|
||||
CommandRequest, ErrorKind, ErrorReply, HDR_DEADLINE, HDR_FINAL, HDR_OPERATOR_SUB,
|
||||
HDR_REQUEST_ID, PingReply, SUBJECT_PREFIX, Verb, device_command_subject,
|
||||
device_command_subscription,
|
||||
HDR_REQUEST_ID, LOGS_MAX_LINES, LogsReply, LogsRequest, PingReply, SUBJECT_PREFIX, Verb,
|
||||
device_command_subject, device_command_subscription,
|
||||
};
|
||||
pub use fleet::{
|
||||
DeploymentName, DeploymentState, DeviceInfo, HeartbeatPayload, InvalidDeploymentName,
|
||||
|
||||
Reference in New Issue
Block a user