refactor(fleet-deploy): collapse smoke companion to one trait, one method #299

Open
johnride wants to merge 1 commits from refactor/smoke-companion-minimal into feat/smoke-test-contract
9 changed files with 638 additions and 1578 deletions

View File

@@ -13,11 +13,11 @@
//! - [`AgentObservation`] — derived view of a fleet agent's KV watch
//! filter, computed from a typed `AgentConfig`. Used by the e2e
//! harness to write desired-state keys the agent will pick up.
//! - [`smoke`] — the smoke-test contract. `Probe` trait, `SmokeSuite`
//! composition, `SmokeTest` companion, and the `deploy`/
//! `deploy_with_smoke` wrappers that bind a Score's interpret to
//! an optional smoke run. Implements ADR-023 P4 ("deploy returns
//! only after smoke-test success") via the companion seam from P7.
//! - [`smoke`] — the smoke-test contract. One `SmokeTest` trait with
//! a single `verify` method returning a `SmokeReport`, plus
//! `deploy` / `deploy_with_smoke` free functions. Implements
//! ADR-023 P4 ("deploy returns only after smoke-test success") via
//! the companion seam from P7.
pub mod agent;
pub mod smoke;

View File

@@ -0,0 +1,633 @@
//! Smoke-test companion — minimal shape.
//!
//! Implements ADR-023 P4 ("deploy returns only after smoke-test
//! success") and P7 ("extend Scores with companions, not API
//! changes"). The earlier draft of this module decomposed smoke into
//! `Probe` / `ProbeAttempt` / `ProbeOutcome` / `ProbeFailure` /
//! `ProbeName` / `RetryPolicy` / `SmokeSuite` / `SmokeStage` /
//! `SmokeReport` / `SmokeTest` / `SmokeAssemblyError`. That was
//! cardinality matching gone overboard for what is, in the end, "run
//! an async function after deploy and surface a pipeline report."
//! This rewrite picks the smallest seam that still buys the things we
//! actually need.
//!
//! ## What we kept
//!
//! - [`SmokeTest`] — one trait, one async method. Implementer writes a
//! normal Rust function and returns a [`SmokeReport`].
//! - Type-level pairing via the associated `type Interpret`
//! (suggested in PR #292 review): one smoke can cover every Score
//! that shares an Interpret (e.g. `HelmChartScore` + any preset on
//! top of it). **Pairing is convention-only in v0.3** —
//! `Score::create_interpret` returns a `Box<dyn Interpret<T>>`, so
//! the `deploy_with_smoke` call site can't statically refuse a
//! mismatched smoke. Closing that gap is an additive refactor on
//! the `Score` trait (`type Interpret: Interpret<T>;`) and is
//! deliberately deferred — the API shape supports it.
//! - [`deploy`] / [`deploy_with_smoke`] — free functions. `deploy`
//! runs the Score; `deploy_with_smoke` runs the Score then blocks
//! on smoke.
//! - Pipeline data the dashboard renders top-to-bottom
//! ([`CheckReport`] entries on [`SmokeReport`]).
//!
//! ## What we dropped, and what to do instead
//!
//! | Old type | Replaced by |
//! |---------------------------------------|-----------------------------------------------------------------------------|
//! | `Probe`, `ProbeAttempt`, `ProbeOutcome`, `ProbeFailure` | Write a normal `async fn` inside `verify`; push `CheckReport`s into a `Vec`. |
//! | `ProbeName` (validated newtype) | `&'static str` — the names come from our own static code, not user input. |
//! | `RetryPolicy` value type | [`poll_until`] free function. Pass the budget + interval directly. |
//! | `SmokeSuite` builder | A `Vec<CheckReport>` collected inside `verify`. No intermediate type. |
//! | `TcpReachable` probe impl | [`tcp_reachable`] free function. Call it; it returns a `CheckReport`. |
//!
//! Reusability is preserved: the helpers ([`poll_until`],
//! [`tcp_reachable`]) compose by simple function call. A future
//! `http_healthy(url)` or `k8s_pod_ready(client, name)` belongs in
//! this module as another `async fn -> CheckReport`, not as another
//! trait.
use std::future::Future;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use harmony::interpret::{Interpret, InterpretError, Outcome};
use harmony::inventory::Inventory;
use harmony::score::Score;
use harmony::topology::Topology;
use thiserror::Error;
use tracing::{info, info_span};
/// Pair an [`Interpret`] type with an async smoke verification.
///
/// One method. Implementers compose their checks inline; no probe
/// trait, no suite builder, no retry-policy value type. If retries
/// are needed, wrap the closure with [`poll_until`]; otherwise just
/// `await` once.
///
/// Associating with `Interpret` (not the concrete `Score`) lets a
/// single smoke impl cover a family of Scores that share an
/// Interpret. Convention-only enforcement in v0.3 — see the
/// module-level docs.
#[async_trait]
pub trait SmokeTest<T: Topology>: Send + Sync {
/// The Interpret class this smoke verifies. Documentary in v0.3;
/// becomes compile-time enforceable once `Score` gains its own
/// `type Interpret`.
type Interpret: Interpret<T>;
/// Run all checks. Implementers are free to short-circuit on
/// first failure, or run every check to give the dashboard a
/// full per-step picture. The framework reads `report.passed()`
/// to gate the deploy and renders `report.checks` for the user.
async fn verify(&self, topology: &T) -> SmokeReport;
}
/// Aggregated smoke result. Dashboards render `checks` top-to-bottom.
Review

SmokeTest companion is not specifically for "Dashboards"... Its to be able to have smoke testing done after deploying any kind of stuff. The "Dashboard" specificity which is everywhere in the comments in this file should be removed.

SmokeTest companion is not specifically for "Dashboards"... Its to be able to have smoke testing done after deploying any kind of stuff. The "Dashboard" specificity which is everywhere in the comments in this file should be removed.
///
/// A report is "passing" iff it has at least one check AND every
/// check passed. An empty report passes nothing — the framework
/// rejects empty reports the same way it rejects failing ones, so a
/// smoke impl that forgets to push any checks fails loudly instead
/// of letting a deploy through.
#[derive(Debug, Clone, Default)]
pub struct SmokeReport {
pub checks: Vec<CheckReport>,
}
impl SmokeReport {
pub fn passed(&self) -> bool {
!self.checks.is_empty() && self.checks.iter().all(|c| c.passed)
}
pub fn is_empty(&self) -> bool {
self.checks.is_empty()
}
/// Iterator over only the failures — handy for compact error
/// rendering without filtering at the call site.
pub fn failed(&self) -> impl Iterator<Item = &CheckReport> {
self.checks.iter().filter(|c| !c.passed)
}
}
/// One step's result, named for the dashboard and the operator's
/// log. `detail` is `Some` whenever there's something useful to show
/// (a connect error, the elapsed time, the response body's first
/// line) — leave it `None` for trivial passes.
#[derive(Debug, Clone)]
pub struct CheckReport {
pub name: &'static str,
pub passed: bool,
pub detail: Option<String>,
}
impl CheckReport {
pub fn pass(name: &'static str) -> Self {
Self {
name,
passed: true,
detail: None,
}
}
pub fn pass_with(name: &'static str, detail: impl Into<String>) -> Self {
Self {
name,
passed: true,
detail: Some(detail.into()),
}
}
pub fn fail(name: &'static str, detail: impl Into<String>) -> Self {
Review

should maybe be named "fail_with" to be consistent with pass / pass_with as it needs "detail"

should maybe be named "fail_with" to be consistent with pass / pass_with as it needs "detail"
Self {
name,
passed: false,
detail: Some(detail.into()),
}
}
}
/// Poll a fallible async closure until it returns `Ok` or `budget`
/// elapses. Returns a single [`CheckReport`] capturing the outcome.
///
/// Use this inside [`SmokeTest::verify`] when you need retries.
/// Otherwise just `await` once and build a `CheckReport` from the
/// result — no helper required.
pub async fn poll_until<F, Fut>(
name: &'static str,
budget: Duration,
interval: Duration,
check: F,
) -> CheckReport
where
F: Fn() -> Fut,
Fut: Future<Output = Result<(), String>>,
{
let deadline = Instant::now() + budget;
loop {
match check().await {
Ok(()) => return CheckReport::pass(name),
Err(e) => {
if Instant::now() >= deadline {
return CheckReport::fail(name, format!("did not pass within {budget:?}: {e}"));
}
tokio::time::sleep(interval).await;
}
}
}
}
/// One TCP connect attempt against `addr`, gated by `timeout`.
///
/// Wrap with [`poll_until`] for retry semantics:
///
/// ```ignore
/// poll_until("nats-reachable", Duration::from_secs(30), Duration::from_secs(1), || async {
/// tcp_reachable("once", "nats:4222", Duration::from_secs(1))
/// .await
/// .passed
/// .then_some(())
/// .ok_or_else(|| "still not reachable".to_string())
/// }).await
/// ```
pub async fn tcp_reachable(name: &'static str, addr: &str, timeout: Duration) -> CheckReport {
match tokio::time::timeout(timeout, tokio::net::TcpStream::connect(addr)).await {
Ok(Ok(_)) => CheckReport::pass(name),
Ok(Err(e)) => CheckReport::fail(name, format!("connect failed: {e}")),
Err(_) => CheckReport::fail(name, format!("timeout after {timeout:?}")),
}
}
/// What `deploy*` returns on success. `smoke` is `None` for the
/// no-smoke variant, `Some` (with `passed() == true`) otherwise.
#[derive(Debug, Clone)]
pub struct DeployOutcome {
pub interpret: Outcome,
pub smoke: Option<SmokeReport>,
}
#[derive(Debug, Error)]
pub enum DeployError {
#[error("score interpret failed: {0}")]
Interpret(#[from] InterpretError),
#[error(
"smoke failed: {} of {} check(s) did not pass",
.report.failed().count(),
.report.checks.len(),
)]
SmokeFailed {
/// The Score deployed cleanly — preserved so the dashboard
/// can render the success message next to the smoke failure.
interpret: Outcome,
report: SmokeReport,
},
}
/// Deploy a Score; do **not** run any smoke. Equivalent to calling
/// `score.interpret(...)` directly — provided so callers can switch
/// between smoke / no-smoke at one site.
pub async fn deploy<S, T>(
score: S,
inventory: &Inventory,
topology: &T,
) -> Result<DeployOutcome, DeployError>
where
S: Score<T>,
T: Topology,
{
let span = info_span!("deploy", score = score.name());
let _g = span.enter();
info!("deploy: starting interpret (no smoke)");
let interpret = score.interpret(inventory, topology).await?;
info!(status = %interpret.status, "deploy: interpret returned");
Ok(DeployOutcome {
interpret,
smoke: None,
})
}
/// Deploy a Score and block on its smoke test (ADR-023 P4).
///
/// Returns `Ok` only when every check in the report passed. An empty
/// report is rejected the same way a failing one is — a smoke impl
/// that forgot to push checks fails the deploy rather than silently
/// passing.
pub async fn deploy_with_smoke<S, ST, T>(
score: S,
smoke: &ST,
inventory: &Inventory,
topology: &T,
) -> Result<DeployOutcome, DeployError>
where
S: Score<T>,
ST: SmokeTest<T>,
T: Topology,
{
let span = info_span!("deploy", score = score.name(), smoke = true);
let _g = span.enter();
info!("deploy: starting interpret");
let interpret = score.interpret(inventory, topology).await?;
info!(status = %interpret.status, "deploy: interpret returned; running smoke");
let report = smoke.verify(topology).await;
info!(
passed = report.passed(),
checks = report.checks.len(),
failed = report.failed().count(),
"deploy: smoke returned",
);
if report.passed() {
Ok(DeployOutcome {
interpret,
smoke: Some(report),
})
} else {
Err(DeployError::SmokeFailed { interpret, report })
}
}
#[cfg(test)]
mod tests {
//! Behavioural tests for the minimal smoke shape.
//!
//! These exercise the deploy ↔ smoke handoff against a trivial
//! in-process Score / Interpret / Topology so we don't depend on
//! any external infrastructure. The fixtures are intentionally
//! tiny — they only carry what each test actually touches.
use super::*;
use async_trait::async_trait;
use harmony::data::Version;
use harmony::interpret::{InterpretName, InterpretStatus};
use harmony::inventory::Inventory;
use harmony::topology::{PreparationError, PreparationOutcome};
use harmony_types::id::Id;
use serde::Serialize;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;
#[derive(Debug)]
struct NoopTopology;
#[async_trait]
impl Topology for NoopTopology {
fn name(&self) -> &str {
"noop"
}
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
Ok(PreparationOutcome::Noop)
}
}
/// A `Score` that records execute() calls and returns a configured
/// outcome. Tracks invocations so a test can assert "interpret ran
/// exactly once".
#[derive(Debug, Clone)]
struct CountingScore {
executed: Arc<AtomicBool>,
fail: bool,
}
impl CountingScore {
fn new() -> Self {
Self {
executed: Arc::new(AtomicBool::new(false)),
fail: false,
}
}
fn failing() -> Self {
Self {
executed: Arc::new(AtomicBool::new(false)),
fail: true,
}
}
}
impl Serialize for CountingScore {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
s.serialize_str("CountingScore")
}
}
#[async_trait]
impl Score<NoopTopology> for CountingScore {
fn create_interpret(&self) -> Box<dyn Interpret<NoopTopology>> {
Box::new(CountingInterpret {
executed: self.executed.clone(),
fail: self.fail,
})
}
fn name(&self) -> String {
"CountingScore".into()
}
}
#[derive(Debug)]
struct CountingInterpret {
executed: Arc<AtomicBool>,
fail: bool,
}
#[async_trait]
impl Interpret<NoopTopology> for CountingInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("counting")
}
fn get_version(&self) -> Version {
Version::from("0.1.0").unwrap()
}
fn get_status(&self) -> InterpretStatus {
InterpretStatus::QUEUED
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
async fn execute(
&self,
_i: &Inventory,
_t: &NoopTopology,
) -> Result<Outcome, InterpretError> {
self.executed.store(true, Ordering::SeqCst);
if self.fail {
Err(InterpretError::new(
"counting: configured to fail".to_string(),
))
} else {
Ok(Outcome::success("counting: ok".to_string()))
}
}
}
/// A test smoke that returns a pre-recorded report and records
/// whether `verify` was called. Lets each test inject the exact
/// report shape it wants to drive `deploy_with_smoke` through.
struct FixedSmoke {
report: Mutex<Option<SmokeReport>>,
verify_called: Arc<AtomicBool>,
}
impl FixedSmoke {
fn new(report: SmokeReport) -> Self {
Self {
report: Mutex::new(Some(report)),
verify_called: Arc::new(AtomicBool::new(false)),
}
}
}
#[async_trait]
impl SmokeTest<NoopTopology> for FixedSmoke {
type Interpret = CountingInterpret;
async fn verify(&self, _t: &NoopTopology) -> SmokeReport {
self.verify_called.store(true, Ordering::SeqCst);
self.report
.lock()
.unwrap()
.take()
.expect("verify called more than once")
}
}
#[tokio::test]
async fn deploy_runs_interpret_and_returns_outcome() {
let score = CountingScore::new();
let executed = score.executed.clone();
let outcome = deploy(score, &Inventory::empty(), &NoopTopology)
.await
.expect("clean interpret should produce Ok");
assert!(executed.load(Ordering::SeqCst), "interpret must run");
assert!(outcome.smoke.is_none(), "no-smoke variant has no report");
assert_eq!(outcome.interpret.message, "counting: ok");
}
#[tokio::test]
async fn deploy_propagates_interpret_failure() {
let result = deploy(CountingScore::failing(), &Inventory::empty(), &NoopTopology).await;
assert!(matches!(result, Err(DeployError::Interpret(_))));
}
#[tokio::test]
async fn deploy_with_smoke_returns_report_on_success() {
let score = CountingScore::new();
let smoke = FixedSmoke::new(SmokeReport {
checks: vec![CheckReport::pass("a"), CheckReport::pass("b")],
});
let outcome = deploy_with_smoke(score, &smoke, &Inventory::empty(), &NoopTopology)
.await
.expect("passing smoke should produce Ok");
let report = outcome.smoke.expect("smoke report present on success");
assert!(report.passed());
assert_eq!(report.checks.len(), 2);
assert!(smoke.verify_called.load(Ordering::SeqCst));
}
#[tokio::test]
async fn deploy_with_smoke_fails_when_any_check_fails() {
// Pin the deploy-blocks-on-smoke contract: a single failed
// check fails the whole deploy, regardless of how many
// passed. The `interpret` field of the error must still be
// populated so the dashboard can show what succeeded
// alongside what didn't.
let smoke = FixedSmoke::new(SmokeReport {
checks: vec![
CheckReport::pass("a"),
CheckReport::fail("b", "bad"),
CheckReport::pass("c"),
],
});
let err = deploy_with_smoke(
CountingScore::new(),
&smoke,
&Inventory::empty(),
&NoopTopology,
)
.await
.expect_err("failing smoke must error");
match err {
DeployError::SmokeFailed { report, interpret } => {
assert!(!report.passed());
assert_eq!(report.failed().count(), 1);
assert_eq!(interpret.message, "counting: ok");
}
other => panic!("expected SmokeFailed, got {other:?}"),
}
}
#[tokio::test]
async fn deploy_with_smoke_rejects_empty_report() {
// A smoke impl that forgot to push any checks must fail
// loudly. Silent pass-through would defeat the entire point
// of the contract.
let smoke = FixedSmoke::new(SmokeReport::default());
let err = deploy_with_smoke(
CountingScore::new(),
&smoke,
&Inventory::empty(),
&NoopTopology,
)
.await
.expect_err("empty smoke must error");
assert!(matches!(err, DeployError::SmokeFailed { .. }));
}
#[tokio::test]
async fn deploy_with_smoke_skips_smoke_when_interpret_fails() {
// Interpret error short-circuits — running smoke against a
// half-deployed component would produce confusing cascade
// failures.
let smoke = FixedSmoke::new(SmokeReport {
checks: vec![CheckReport::pass("never-runs")],
});
let verify_called = smoke.verify_called.clone();
let err = deploy_with_smoke(
CountingScore::failing(),
&smoke,
&Inventory::empty(),
&NoopTopology,
)
.await
.expect_err("interpret failure must error");
assert!(matches!(err, DeployError::Interpret(_)));
assert!(
!verify_called.load(Ordering::SeqCst),
"smoke must NOT run when interpret fails",
);
}
#[tokio::test]
async fn smoke_report_passed_requires_nonempty_and_all_pass() {
assert!(!SmokeReport::default().passed());
assert!(
!SmokeReport {
checks: vec![CheckReport::pass("a"), CheckReport::fail("b", "x")],
}
.passed()
);
assert!(
SmokeReport {
checks: vec![CheckReport::pass("a"), CheckReport::pass("b")],
}
.passed()
);
}
#[tokio::test]
async fn poll_until_returns_pass_when_check_succeeds_within_budget() {
let calls = Arc::new(AtomicBool::new(false));
let c = calls.clone();
let check = poll_until(
"eventually-ok",
Duration::from_secs(2),
Duration::from_millis(10),
move || {
let c = c.clone();
async move {
if c.swap(true, Ordering::SeqCst) {
Ok(())
} else {
Err("not yet".into())
}
}
},
)
.await;
assert!(check.passed);
assert_eq!(check.name, "eventually-ok");
}
#[tokio::test]
async fn poll_until_returns_fail_after_budget_elapses() {
let check = poll_until(
"never-ok",
Duration::from_millis(50),
Duration::from_millis(10),
|| async { Err("still nope".into()) },
)
.await;
assert!(!check.passed);
assert!(
check
.detail
.as_deref()
.is_some_and(|d| d.contains("still nope")),
"detail must carry the last error, got {:?}",
check.detail,
);
}
#[tokio::test]
async fn tcp_reachable_passes_against_real_listener() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr: SocketAddr = listener.local_addr().unwrap();
// Accept loop so connect() completes; otherwise SYN-RST.
tokio::spawn(async move {
let _ = listener.accept().await;
});
let check = tcp_reachable("loopback", &addr.to_string(), Duration::from_secs(1)).await;
assert!(check.passed, "loopback connect must succeed: {check:?}");
}
#[tokio::test]
async fn tcp_reachable_fails_with_timeout_when_no_listener() {
// RFC 5737 TEST-NET-1: guaranteed-non-routable; a SYN here
// will sit unanswered and we'll hit our own timeout deterministically.
let check = tcp_reachable("dead-net", "192.0.2.1:1", Duration::from_millis(150)).await;
assert!(!check.passed);
assert!(
check
.detail
.as_deref()
.is_some_and(|d| d.contains("timeout") || d.contains("connect failed")),
);
}
#[tokio::test]
async fn deploy_outcome_smoke_field_is_none_for_no_smoke_deploy() {
let outcome = deploy(CountingScore::new(), &Inventory::empty(), &NoopTopology)
.await
.unwrap();
assert!(outcome.smoke.is_none());
}
}

View File

@@ -1,82 +0,0 @@
//! The Score → SmokeTest pairing contract.
//!
//! [`SmokeTest`] is the companion trait that pairs a [`Score`] with
//! the probes that verify *that specific* Score's deployment worked.
//! The pairing is type-level via the `Score` associated type — the
//! compiler refuses to call
//! [`deploy_with_smoke`](super::deploy::deploy_with_smoke) with a
//! `FleetOperatorSmokeTest` for a `DnsScore`, because their
//! associated types don't match. This is the same trick that makes
//! `K8sBareTopology: PostgreSQL` an unsatisfied trait bound when you
//! accidentally try to run PostgreSQL on a topology that doesn't
//! advertise it — see ADR-024 §2 and JG's *For the Love of
//! Compilers* talk.
//!
//! ## Why not a method on `Score`?
//!
//! Per ADR-023 P7, framework capabilities attach as companions, not
//! as new methods on `Score`/`Interpret`. The base trait surface
//! stays one method (`create_interpret`). Smoke is *something paired
//! with* a Score, not *something on* a Score. The orthogonal
//! benefit: a Score can be deployed without smoke, can be deployed
//! with one of several smoke configurations (dev, prod, e2e), or
//! can have its smoke evolved independently of its interpret logic.
//!
//! ## Why `assemble` returns a value, not a future-of-checks?
//!
//! The probes a smoke test wants to run are usually known up front
//! from the Score's data — "the service name we just deployed,
//! check it on the cluster's default DNS". Resolving that into a
//! concrete `SmokeSuite` *before* the probes run means the dashboard
//! can render the pipeline (probe names, stage count) the moment
//! smoke starts. If it returned a `Stream<Item = ProbeReport>`, the
//! pipeline shape wouldn't be knowable until the run completed.
use async_trait::async_trait;
use thiserror::Error;
use harmony::score::Score;
use harmony::topology::Topology;
use super::suite::SmokeSuite;
/// Pair a [`Score`] with the [`SmokeSuite`] that proves its deploy
/// converged.
///
/// Implementors live next to the Score they pair with — the
/// `FleetOperatorSmokeTest` ships from this crate alongside
/// `FleetOperatorScore`, the same way [`crate::AgentObservation`]
/// lives alongside `FleetAgentScore`. This is the companion
/// placement rule from ADR-023 §"Extend Scores with companions".
#[async_trait]
pub trait SmokeTest<T: Topology>: Send + Sync {
/// The Score this smoke test verifies. The type lock means
/// `SM::Score = S` is enforced at every call site.
type Score: Score<T>;
/// Build the [`SmokeSuite`] that will run after the Score's
/// `interpret` succeeds. Has access to the Score so probes can
/// be parameterized on the deploy's actual inputs (service
/// names, namespaces, ports), and to the topology so probes can
/// pull credentials / endpoints from declared capabilities.
async fn assemble(
&self,
score: &Self::Score,
topology: &T,
) -> Result<SmokeSuite<T>, SmokeAssemblyError>;
}
/// Why a smoke test failed to *build* (before any probe runs).
///
/// Distinct from probe failure — assembly failure means the Score
/// is in an unexpected shape (missing endpoint, can't resolve a
/// reference). The dashboard shows this differently from a probe
/// failure: it's an authoring / configuration error, not a deployed-
/// system error.
#[derive(Debug, Clone, Error, PartialEq, Eq)]
pub enum SmokeAssemblyError {
#[error("smoke test could not derive endpoint for {what}: {detail}")]
MissingEndpoint { what: String, detail: String },
#[error("smoke test assembly failed: {0}")]
Other(String),
}

View File

@@ -1,387 +0,0 @@
//! `deploy` and `deploy_with_smoke` — the framework-glue wrappers
//! that bind a [`Score`]'s interpret to an optional [`SmokeTest`].
//!
//! These are free functions, not methods on [`Score`]/[`Maestro`]
//! /[`Interpret`]. The framework's domain directory is left
//! untouched in this PR (ADR-023 P7 — "extend Scores with companions,
//! not API changes"). If a later PR proves this shape, promoting
//! `deploy_with_smoke` to a `Maestro` convenience is a one-line
//! additive change.
//!
//! ## Opting in or out
//!
//! Two entry points, one decision at the call site:
//!
//! - [`deploy`] — run the Score's interpret, no smoke.
//! - [`deploy_with_smoke`] — run the Score's interpret, then the
//! paired [`SmokeTest`]. Returns only after the smoke run
//! completes; a failed smoke is a deploy error per the project
//! rule "deploy blocks on smoke-test".
//!
//! Callers (CLI binaries, the e2e harness, the operator's
//! reconciler) choose which to call. There is no global config knob
//! and no per-Score override — the choice is one function call.
use harmony::interpret::{InterpretError, Outcome};
use harmony::inventory::Inventory;
use harmony::score::Score;
use harmony::topology::Topology;
use thiserror::Error;
use tracing::{info, info_span};
use super::contract::{SmokeAssemblyError, SmokeTest};
use super::suite::SmokeReport;
/// What [`deploy`]/[`deploy_with_smoke`] return on success.
///
/// `interpret` is the underlying `Score::interpret` result —
/// preserved verbatim so callers that previously consumed `Outcome`
/// directly continue to work after switching to these wrappers.
/// `smoke` is `None` when the no-smoke variant was used; `Some`
/// otherwise, with `passed() == true` guaranteed (a failed smoke is
/// a [`DeployError::SmokeFailed`]).
#[derive(Debug, Clone)]
pub struct DeployOutcome {
pub interpret: Outcome,
pub smoke: Option<SmokeReport>,
}
/// All the ways a deploy can fail. Three arms, each pointing at a
/// concrete cause the operator can act on:
///
/// - `Interpret` — the Score's own work failed (helm error, kube
/// apply error, etc). Same shape callers got before smoke existed.
/// - `SmokeAssembly` — the Score deployed, but its `SmokeTest`
/// couldn't even *build* a suite. Typically means the test's
/// author is missing an endpoint reference; an authoring bug, not
/// a runtime bug.
/// - `SmokeFailed` — the Score deployed and the suite built, but
/// one or more probes failed. The full [`SmokeReport`] is
/// attached so the dashboard can render which stages passed and
/// which didn't, instead of a single opaque "deploy failed".
#[derive(Debug, Error)]
pub enum DeployError {
#[error("score interpret failed: {0}")]
Interpret(#[from] InterpretError),
#[error("smoke assembly failed: {0}")]
SmokeAssembly(#[from] SmokeAssemblyError),
#[error("smoke test failed ({} of {} probe(s) did not pass)", .report.failed().count(), .report.probes.len())]
SmokeFailed {
/// The Score deployed cleanly — `interpret` is preserved so
/// callers (and the dashboard) can render the underlying
/// success message alongside the smoke failure.
interpret: Outcome,
report: SmokeReport,
},
}
/// Deploy a Score without running any smoke checks.
///
/// Returns immediately after `score.interpret` returns. Equivalent
/// to calling `score.interpret(...)` directly — provided here so
/// callers can switch between smoke / no-smoke without changing
/// the surrounding control flow.
pub async fn deploy<T, S>(
score: S,
inventory: &Inventory,
topology: &T,
) -> Result<DeployOutcome, DeployError>
where
T: Topology,
S: Score<T>,
{
let span = info_span!("deploy", score = score.name());
let _g = span.enter();
info!("deploy: starting interpret (no smoke)");
let interpret = score.interpret(inventory, topology).await?;
info!(status = %interpret.status, "deploy: interpret returned");
Ok(DeployOutcome {
interpret,
smoke: None,
})
}
/// Deploy a Score, then run its paired SmokeTest. Block on the
/// suite — see project rule "deploy blocks on smoke-test".
///
/// The type bound `SM: SmokeTest<T, Score = S>` is where the
/// compile-time pairing lives: the `SmokeTest` you pass must
/// declare *this* Score type as its associated `Score`. A mismatch
/// is rejected at the call site, not at runtime. (See ADR-024 §2
/// and JG's compile-time-feedback principle.)
pub async fn deploy_with_smoke<T, S, SM>(
score: S,
smoke: SM,
inventory: &Inventory,
topology: &T,
) -> Result<DeployOutcome, DeployError>
where
T: Topology,
S: Score<T>,
SM: SmokeTest<T, Score = S>,
{
let span = info_span!("deploy", score = score.name(), smoke = true);
let _g = span.enter();
info!("deploy: starting interpret");
let interpret = score.interpret(inventory, topology).await?;
info!(status = %interpret.status, "deploy: interpret returned, assembling smoke");
let suite = smoke.assemble(&score, topology).await?;
info!(stages = suite.len(), "deploy: smoke suite assembled");
let report = suite.run(topology).await;
info!(
passed = report.passed(),
elapsed_ms = report.elapsed.as_millis() as u64,
"deploy: smoke finished",
);
if !report.passed() {
return Err(DeployError::SmokeFailed { interpret, report });
}
Ok(DeployOutcome {
interpret,
smoke: Some(report),
})
}
// ---- tests -----------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use crate::companion::smoke::probe::{Probe, ProbeAttempt, ProbeName, RetryPolicy};
use crate::companion::smoke::suite::SmokeSuite;
use async_trait::async_trait;
use harmony::data::Version;
use harmony::interpret::{Interpret, InterpretName, InterpretStatus};
use harmony::topology::PreparationOutcome;
use harmony_types::id::Id;
use serde::Serialize;
use std::sync::Mutex;
// -- test fixtures: a NoopTopology + minimal Score + minimal Probe --------
#[derive(Debug)]
struct NoopTopology;
#[async_trait]
impl Topology for NoopTopology {
fn name(&self) -> &str {
"noop"
}
async fn ensure_ready(
&self,
) -> Result<PreparationOutcome, harmony::topology::PreparationError> {
Ok(PreparationOutcome::Noop)
}
}
/// Minimal Score whose interpret can be scripted to succeed or
/// fail. Demonstrates that `deploy_with_smoke` short-circuits
/// correctly when interpret fails.
#[derive(Debug, Clone, Serialize)]
struct TrivialScore {
label: String,
should_fail: bool,
}
impl<T: Topology> Score<T> for TrivialScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(TrivialInterpret {
should_fail: self.should_fail,
label: self.label.clone(),
})
}
fn name(&self) -> String {
format!("TrivialScore({})", self.label)
}
}
#[derive(Debug)]
struct TrivialInterpret {
should_fail: bool,
label: String,
}
#[async_trait]
impl<T: Topology> Interpret<T> for TrivialInterpret {
async fn execute(
&self,
_inventory: &Inventory,
_topology: &T,
) -> Result<Outcome, InterpretError> {
if self.should_fail {
Err(InterpretError::new(format!(
"scripted failure: {}",
self.label
)))
} else {
Ok(Outcome::success(format!("deployed {}", self.label)))
}
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("TrivialInterpret")
}
fn get_version(&self) -> Version {
Version::from("0.0.0").unwrap()
}
fn get_status(&self) -> InterpretStatus {
InterpretStatus::QUEUED
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
}
/// A probe whose every attempt is hardcoded. Lets us script
/// "this stage passes" or "this stage rejects" in tests.
#[derive(Debug)]
struct Always {
name: ProbeName,
attempt: ProbeAttempt,
}
impl Always {
fn new(name: &str, attempt: ProbeAttempt) -> Self {
Self {
name: ProbeName::try_new(name).unwrap(),
attempt,
}
}
}
#[async_trait]
impl<T: Topology> Probe<T> for Always {
fn name(&self) -> &ProbeName {
&self.name
}
async fn check(&self, _t: &T) -> ProbeAttempt {
self.attempt.clone()
}
}
/// The shape we expect every fleet smoke test to follow: it
/// owns no state beyond what the Score gives it.
#[derive(Debug)]
struct TrivialSmoke {
outcomes: Mutex<Vec<ProbeAttempt>>,
}
impl TrivialSmoke {
fn new(outcomes: Vec<ProbeAttempt>) -> Self {
Self {
outcomes: Mutex::new(outcomes),
}
}
}
#[async_trait]
impl<T: Topology> SmokeTest<T> for TrivialSmoke {
type Score = TrivialScore;
async fn assemble(
&self,
score: &Self::Score,
_topology: &T,
) -> Result<SmokeSuite<T>, SmokeAssemblyError> {
let mut suite = SmokeSuite::new();
for (i, attempt) in self.outcomes.lock().unwrap().iter().enumerate() {
let name = format!("{}-probe-{i}", score.label);
suite = suite.stage(Always::new(&name, attempt.clone()), RetryPolicy::once());
}
Ok(suite)
}
}
// -- the actual contract tests --------------------------------------------
fn inv() -> Inventory {
Inventory::empty()
}
#[tokio::test]
async fn deploy_without_smoke_returns_outcome_and_no_report() {
let s = TrivialScore {
label: "alpha".to_string(),
should_fail: false,
};
let r = deploy(s, &inv(), &NoopTopology).await.expect("deploy ok");
assert_eq!(r.interpret.status, InterpretStatus::SUCCESS);
assert!(
r.smoke.is_none(),
"no-smoke variant must not synthesize a report"
);
}
#[tokio::test]
async fn interpret_failure_short_circuits_before_smoke_assembly() {
// If interpret fails, we MUST NOT call `smoke.assemble` —
// there is nothing deployed to verify, and running smoke
// would produce confusing cascade failures in the report.
let s = TrivialScore {
label: "beta".to_string(),
should_fail: true,
};
// Use a smoke that would panic if assembled — proves we
// never got there.
struct PanicSmoke;
#[async_trait]
impl<T: Topology> SmokeTest<T> for PanicSmoke {
type Score = TrivialScore;
async fn assemble(
&self,
_: &Self::Score,
_: &T,
) -> Result<SmokeSuite<T>, SmokeAssemblyError> {
panic!("assemble must not be called when interpret fails");
}
}
match deploy_with_smoke(s, PanicSmoke, &inv(), &NoopTopology).await {
Err(DeployError::Interpret(e)) => {
assert!(format!("{e}").contains("scripted failure"));
}
other => panic!("expected Interpret error, got {other:?}"),
}
}
#[tokio::test]
async fn all_probes_pass_yields_deployoutcome_with_report() {
let s = TrivialScore {
label: "gamma".to_string(),
should_fail: false,
};
let smoke = TrivialSmoke::new(vec![
ProbeAttempt::Ok(Some("first stage ok".to_string())),
ProbeAttempt::Ok(None),
]);
let r = deploy_with_smoke(s, smoke, &inv(), &NoopTopology)
.await
.expect("deploy + smoke ok");
let report = r.smoke.expect("smoke report present when smoke runs");
assert!(report.passed());
assert_eq!(report.probes.len(), 2);
}
#[tokio::test]
async fn probe_failure_blocks_deploy() {
let s = TrivialScore {
label: "delta".to_string(),
should_fail: false,
};
let smoke = TrivialSmoke::new(vec![
ProbeAttempt::Ok(None),
ProbeAttempt::Fatal("definitely broken".to_string()),
]);
match deploy_with_smoke(s, smoke, &inv(), &NoopTopology).await {
Err(DeployError::SmokeFailed { interpret, report }) => {
// The Score itself succeeded — that's preserved, so
// the dashboard can show "deploy converged, smoke
// disagrees".
assert_eq!(interpret.status, InterpretStatus::SUCCESS);
assert!(!report.passed());
assert_eq!(report.failed().count(), 1);
assert_eq!(report.probes.last().unwrap().name.as_str(), "delta-probe-1");
}
other => panic!("expected SmokeFailed, got {other:?}"),
}
}
}

View File

@@ -1,62 +0,0 @@
//! Smoke-test contract — the second Score companion (ADR-023 P7,
//! ADR-023 P4 "deploy returns only after smoke-test success").
//!
//! ## The big picture
//!
//! ```text
//! ┌────────────┐ pair ┌──────────────┐ assemble ┌────────────┐
//! │ *Score │◀─type─────│ SmokeTest │─────────────▶│ SmokeSuite │
//! └────────────┘ locked └──────────────┘ └────────────┘
//! ▲ │
//! │ │
//! deploy_with_smoke ─── interpret ─── then ── run probes ──▶ SmokeReport
//! ```
//!
//! - [`Probe`] is the single-attempt unit, classifying its
//! observation as [`Ok`/`Retry`/`Fatal`](probe::ProbeAttempt). It's
//! also the stage the dashboard renders.
//! - [`SmokeSuite`] is an ordered sequence of probes with per-stage
//! [`RetryPolicy`]. Sequential by design; parallel stages deferred.
//! - [`SmokeTest`] pairs a [`Score`](harmony::score::Score) with a
//! suite by associated type — a `SmokeTest<Score = FleetOperatorScore>`
//! cannot be passed to `deploy_with_smoke(DnsScore, …)` because the
//! compiler refuses the mismatch.
//! - [`deploy`] / [`deploy_with_smoke`] are free functions that
//! bind a Score's interpret to an optional SmokeTest. Returns
//! only after the smoke run completes — see
//! [`DeployError::SmokeFailed`] for the failure shape.
//!
//! ## Where this lives
//!
//! Inside `harmony-fleet-deploy` for Phase 0, alongside the
//! existing [`AgentObservation`](super::AgentObservation) companion.
//! When a second consumer (PostgreSQL deploy, monitoring stack)
//! adopts this contract, the module promotes to a top-level
//! `harmony-smoke` crate with no API churn — the move is mechanical
//! because nothing in here depends on fleet types.
//!
//! ## What this does NOT do
//!
//! - **No new `HarmonyEvent` variants in Phase 0.** Stage progress
//! is emitted via `tracing::info!` events inside `info_span!`s.
//! A later PR can add `HarmonyEvent::SmokeStage{Started,Finished}`
//! if the dashboard wants the typed seam.
//! - **No `Score`/`Interpret` trait edits.** Per ADR-023 P7. The
//! smoke seam is purely companion-shaped.
//! - **No CLI flag yet.** Phase 1 wires `harmony-fleet-deploy <cmd>
//! --no-smoke` once a real `FleetOperatorSmokeTest` exists.
pub mod contract;
pub mod deploy;
pub mod probe;
pub mod probes;
pub mod suite;
pub use contract::{SmokeAssemblyError, SmokeTest};
pub use deploy::{DeployError, DeployOutcome, deploy, deploy_with_smoke};
pub use probe::{
InvalidProbeName, Probe, ProbeAttempt, ProbeFailure, ProbeName, ProbeOutcome, RetryPolicy,
run_probe,
};
pub use probes::TcpReachable;
pub use suite::{ProbeReport, SmokeReport, SmokeSuite};

View File

@@ -1,515 +0,0 @@
//! Probe contract — the single-attempt unit of a smoke run.
//!
//! A [`Probe`] is what gets visualized as one stage of the smoke
//! pipeline. Each implementation answers one question against a
//! deployed system: "is this port reachable", "did this pod become
//! ready", "is this KV key set to the value we expect".
//!
//! The retry loop is **not** a probe responsibility. A probe's
//! [`check`](Probe::check) returns one of three classifications via
//! [`ProbeAttempt`] — `Ok`, `Retry`, or `Fatal` — and [`run_probe`]
//! drives the polling timer + timeout budget around it. This keeps
//! probes single-responsibility and makes their unit tests
//! deterministic.
//!
//! Each public type here is cardinality-matched to the concept it
//! represents (see ADR-024 §2 and JG's *For the Love of Compilers*
//! talk). `ProbeName` is a validated newtype, not `String`, because
//! "an empty probe name" or "a probe name with a newline in it"
//! cannot mean anything useful and would corrupt the dashboard
//! rendering. `ProbeAttempt` is a three-arm sum, not `Result<(), Error>`,
//! because "not ready yet" and "definitively no" must drive different
//! orchestration paths.
use std::fmt;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use thiserror::Error;
use tracing::{debug, warn};
use harmony::topology::Topology;
/// One attempted check by a probe.
///
/// The probe classifies its own observation rather than letting the
/// orchestrator guess: a 5xx response is fundamentally different from
/// a connection refused, even if both look like "failure" to a naive
/// caller. The orchestrator ([`run_probe`]) uses this classification
/// to decide whether to keep polling or bail out.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProbeAttempt {
/// The probe's invariant holds *right now*. Optional detail is
/// surfaced in the smoke report (e.g. "HTTP 200 in 12 ms",
/// "found 3 ready pods").
Ok(Option<String>),
/// The probe didn't observe what it's looking for yet, but the
/// state is consistent with eventual success. Keep polling until
/// the timeout. The string is the most recent observation, kept
/// so that a final timeout can quote the last thing we saw.
Retry(String),
/// The probe got a definitive negative answer — no amount of
/// further polling will change it. Skip the rest of the budget
/// and report failure now. Example: HTTP 404 from a URL that
/// must exist after deploy, or a `Service` with the wrong
/// selector.
Fatal(String),
}
/// Result of a full probe run after [`run_probe`] has consumed the
/// retry budget (or short-circuited).
///
/// This is the per-probe shape that lands in
/// [`crate::companion::smoke::SmokeReport`]. The orchestrator wraps
/// it with the [`ProbeName`] and the elapsed time so the dashboard
/// can render each stage independently.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProbeOutcome {
/// At least one attempt returned [`ProbeAttempt::Ok`] within the
/// retry budget.
Pass {
elapsed: Duration,
attempts: u32,
detail: Option<String>,
},
/// Either we exhausted the budget with only `Retry`s (timeout)
/// or a single `Fatal` short-circuited us (rejected).
Fail {
elapsed: Duration,
attempts: u32,
failure: ProbeFailure,
},
}
impl ProbeOutcome {
pub fn passed(&self) -> bool {
matches!(self, Self::Pass { .. })
}
}
/// Why a probe failed. Two arms, deliberately — they map to two
/// different operator actions: "I waited and nothing happened" needs
/// a longer budget or a stuck-component investigation; "something
/// gave me a hard no" needs a configuration fix.
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum ProbeFailure {
#[error("timed out after {budget:?} (last observation: {last_observation})")]
Timeout {
budget: Duration,
last_observation: String,
},
#[error("rejected: {detail}")]
Rejected { detail: String },
}
/// Polling parameters for one probe.
///
/// `interval` is the wait between attempts. `timeout` is the total
/// wall-clock budget from the first attempt to the last; the loop
/// will not start a new attempt past this budget but will let an
/// in-flight one finish.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RetryPolicy {
interval: Duration,
timeout: Duration,
}
impl RetryPolicy {
/// Standard polling shape: try, wait `interval`, try again, until
/// either an `Ok`/`Fatal` happens or `timeout` elapses.
///
/// Panics if `interval` is zero — a zero interval combined with
/// any non-trivial check would spin the executor and produce no
/// useful retry semantics. Express "run once" via [`Self::once`]
/// instead.
pub fn polling(interval: Duration, timeout: Duration) -> Self {
assert!(
!interval.is_zero(),
"RetryPolicy::polling requires a non-zero interval; use RetryPolicy::once for a single attempt",
);
Self { interval, timeout }
}
/// Single-shot: one attempt, no retries. Useful for probes whose
/// success criterion is binary the moment we check (e.g. "is
/// this string equal to that string").
pub fn once() -> Self {
Self {
interval: Duration::from_millis(1),
timeout: Duration::ZERO,
}
}
pub fn interval(&self) -> Duration {
self.interval
}
pub fn timeout(&self) -> Duration {
self.timeout
}
}
/// A validated probe name.
///
/// Probe names appear in `tracing` events, error messages, the smoke
/// report, and the eventual dashboard. Constraining them at
/// construction means the dashboard never has to defend against
/// surprising bytes (control chars, newlines, etc).
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ProbeName(String);
#[derive(Debug, Error, PartialEq, Eq)]
pub enum InvalidProbeName {
#[error("probe name must not be empty")]
Empty,
#[error("probe name must not exceed 128 bytes (got {0})")]
TooLong(usize),
#[error("probe name must not contain control characters")]
ControlChar,
}
impl ProbeName {
pub fn try_new(s: impl Into<String>) -> Result<Self, InvalidProbeName> {
let s = s.into();
if s.is_empty() {
return Err(InvalidProbeName::Empty);
}
if s.len() > 128 {
return Err(InvalidProbeName::TooLong(s.len()));
}
if s.chars().any(|c| c.is_control()) {
return Err(InvalidProbeName::ControlChar);
}
Ok(Self(s))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Display for ProbeName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
/// One pipeline stage of a [`SmokeSuite`](super::SmokeSuite).
///
/// Object-safe so a suite can hold a heterogeneous `Vec<Box<dyn
/// Probe<T>>>`. The associated retry behavior is supplied separately
/// by the suite (via [`RetryPolicy`]) so that the same probe type
/// can be reused with different budgets in different suites.
#[async_trait]
pub trait Probe<T: Topology>: Send + Sync + std::fmt::Debug {
fn name(&self) -> &ProbeName;
async fn check(&self, topology: &T) -> ProbeAttempt;
}
/// Drive a probe through its retry policy and return the consolidated
/// outcome.
///
/// Behavior:
///
/// - Every `interval` we run `probe.check(...)`. A successful attempt
/// wins immediately; a `Fatal` short-circuits to `Rejected`; a
/// `Retry` records its observation and we wait.
/// - If `timeout` is `Duration::ZERO` we run exactly one attempt
/// (the "once" shape).
/// - Otherwise we keep attempting until the elapsed wall time
/// exceeds `timeout`. The last `Retry` observation becomes the
/// `Timeout::last_observation` field so the failure message tells
/// the operator what we kept seeing.
///
/// The function emits a `tracing` event per attempt at `debug` level
/// and one at `warn` for the final failure (so a Phase-1 dashboard
/// can subscribe via a `tracing` layer without us needing to add a
/// new `HarmonyEvent` variant yet).
pub async fn run_probe<T, P>(probe: &P, topology: &T, policy: RetryPolicy) -> ProbeOutcome
where
T: Topology,
P: Probe<T> + ?Sized,
{
let start = Instant::now();
let mut attempts: u32 = 0;
loop {
attempts += 1;
// Each iteration produces either an early return (Ok / Fatal)
// or the most recent Retry observation. Threading that
// observation back into the next iteration only happens
// implicitly via the timeout check below, so there's no
// long-lived `last_observation` to lint over.
let last_observation = match probe.check(topology).await {
ProbeAttempt::Ok(detail) => {
debug!(
probe = %probe.name(),
attempts,
elapsed_ms = start.elapsed().as_millis() as u64,
"probe passed",
);
return ProbeOutcome::Pass {
elapsed: start.elapsed(),
attempts,
detail,
};
}
ProbeAttempt::Fatal(detail) => {
warn!(
probe = %probe.name(),
attempts,
%detail,
"probe rejected (fatal)",
);
return ProbeOutcome::Fail {
elapsed: start.elapsed(),
attempts,
failure: ProbeFailure::Rejected { detail },
};
}
ProbeAttempt::Retry(obs) => {
debug!(
probe = %probe.name(),
attempts,
observation = %obs,
"probe retry",
);
obs
}
};
if start.elapsed() >= policy.timeout {
warn!(
probe = %probe.name(),
attempts,
budget_ms = policy.timeout.as_millis() as u64,
last_observation = %last_observation,
"probe timed out",
);
return ProbeOutcome::Fail {
elapsed: start.elapsed(),
attempts,
failure: ProbeFailure::Timeout {
budget: policy.timeout,
last_observation,
},
};
}
tokio::time::sleep(policy.interval).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use harmony::topology::PreparationOutcome;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU32, Ordering};
// -- ProbeName -----------------------------------------------------------
#[test]
fn probe_name_accepts_normal_strings() {
assert!(ProbeName::try_new("nats-reachable").is_ok());
assert!(ProbeName::try_new("HTTP / health").is_ok());
}
#[test]
fn probe_name_rejects_empty() {
assert_eq!(ProbeName::try_new(""), Err(InvalidProbeName::Empty));
}
#[test]
fn probe_name_rejects_control_chars() {
// Dashboard renders these as garbage — fail at construction.
assert_eq!(
ProbeName::try_new("bad\nname"),
Err(InvalidProbeName::ControlChar)
);
}
#[test]
fn probe_name_rejects_overlong() {
let long = "x".repeat(129);
assert_eq!(
ProbeName::try_new(long.clone()),
Err(InvalidProbeName::TooLong(129))
);
}
// -- RetryPolicy ---------------------------------------------------------
#[test]
fn retry_policy_once_is_single_attempt() {
let p = RetryPolicy::once();
assert_eq!(p.timeout(), Duration::ZERO);
}
#[test]
#[should_panic(expected = "non-zero interval")]
fn retry_policy_polling_rejects_zero_interval() {
// Zero interval + non-trivial check is a spin loop. The
// type system can't catch this, so a runtime assert at
// construction is the next best thing — fail loud, fail
// early, not on the operator's deploy log.
let _ = RetryPolicy::polling(Duration::ZERO, Duration::from_secs(5));
}
// -- run_probe orchestration --------------------------------------------
/// Test probe that returns a scripted sequence of attempts, then
/// loops forever on the last one. Lets us simulate
/// "retry-then-ok" without real network I/O.
#[derive(Debug)]
struct ScriptedProbe {
name: ProbeName,
script: Mutex<Vec<ProbeAttempt>>,
calls: AtomicU32,
}
impl ScriptedProbe {
fn new(name: &str, script: Vec<ProbeAttempt>) -> Self {
Self {
name: ProbeName::try_new(name).unwrap(),
script: Mutex::new(script),
calls: AtomicU32::new(0),
}
}
}
#[async_trait]
impl<T: Topology> Probe<T> for ScriptedProbe {
fn name(&self) -> &ProbeName {
&self.name
}
async fn check(&self, _t: &T) -> ProbeAttempt {
self.calls.fetch_add(1, Ordering::SeqCst);
let mut s = self.script.lock().unwrap();
if s.len() == 1 {
s[0].clone()
} else {
s.remove(0)
}
}
}
#[derive(Debug)]
struct NoopTopology;
#[async_trait]
impl Topology for NoopTopology {
fn name(&self) -> &str {
"noop"
}
async fn ensure_ready(
&self,
) -> Result<PreparationOutcome, harmony::topology::PreparationError> {
Ok(PreparationOutcome::Noop)
}
}
#[tokio::test]
async fn ok_on_first_attempt_yields_pass() {
let probe = ScriptedProbe::new("instant", vec![ProbeAttempt::Ok(Some("hi".to_string()))]);
let outcome = run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(1), Duration::from_millis(50)),
)
.await;
match outcome {
ProbeOutcome::Pass {
attempts, detail, ..
} => {
assert_eq!(attempts, 1);
assert_eq!(detail.as_deref(), Some("hi"));
}
other => panic!("expected Pass, got {other:?}"),
}
}
#[tokio::test]
async fn retries_until_ok_within_budget() {
let probe = ScriptedProbe::new(
"eventual",
vec![
ProbeAttempt::Retry("not yet (1)".to_string()),
ProbeAttempt::Retry("not yet (2)".to_string()),
ProbeAttempt::Ok(None),
],
);
let outcome = run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(2), Duration::from_millis(500)),
)
.await;
match outcome {
ProbeOutcome::Pass { attempts, .. } => assert_eq!(attempts, 3),
other => panic!("expected Pass after retries, got {other:?}"),
}
}
#[tokio::test]
async fn fatal_short_circuits_without_consuming_full_budget() {
let probe = ScriptedProbe::new(
"definite-no",
vec![ProbeAttempt::Fatal("wrong selector".to_string())],
);
let start = Instant::now();
let outcome = run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(5), Duration::from_secs(10)),
)
.await;
// The whole point of Fatal is that we don't burn the budget.
// Generous upper bound — CI variance, not a real check.
assert!(
start.elapsed() < Duration::from_secs(1),
"Fatal should return immediately, took {:?}",
start.elapsed()
);
match outcome {
ProbeOutcome::Fail {
failure: ProbeFailure::Rejected { detail },
..
} => assert_eq!(detail, "wrong selector"),
other => panic!("expected Rejected, got {other:?}"),
}
}
#[tokio::test]
async fn always_retry_yields_timeout_with_last_observation() {
let probe = ScriptedProbe::new(
"never-ready",
vec![ProbeAttempt::Retry("still pending".to_string())],
);
let outcome = run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(5), Duration::from_millis(40)),
)
.await;
match outcome {
ProbeOutcome::Fail {
failure:
ProbeFailure::Timeout {
budget,
last_observation,
},
attempts,
..
} => {
assert_eq!(budget, Duration::from_millis(40));
assert_eq!(last_observation, "still pending");
assert!(
attempts >= 2,
"expected at least two attempts, got {attempts}"
);
}
other => panic!("expected Timeout, got {other:?}"),
}
}
}

View File

@@ -1,15 +0,0 @@
//! First-party [`Probe`](super::Probe) implementations.
//!
//! Phase 0 ships exactly one — [`TcpReachable`] — to validate the
//! trait shape against real network I/O. Phase 1 adds HTTP, K8s
//! pod-ready, NATS KV. Each future probe is one self-contained
//! file under this module.
//!
//! Probes that need their own crate dependencies (e.g. a future
//! `HelmReleaseHealthy` that talks to the helm client) can be
//! gated behind a Cargo feature here without touching the core
//! smoke types.
pub mod tcp;
pub use tcp::TcpReachable;

View File

@@ -1,225 +0,0 @@
//! `TcpReachable` — the smallest useful probe: open a TCP connection
//! to `host:port` within a timeout.
//!
//! Used as the first stage of practically every fleet smoke suite:
//! "before checking the operator's `/healthz`, prove the cluster IP
//! even routes". A connect-refused is a `Retry` (we may have caught
//! the service mid-startup); a DNS or address-syntax error is
//! `Fatal` (no amount of polling will fix it).
//!
//! Implemented with `tokio::net::TcpStream`. No new crate
//! dependencies — `tokio` is already pulled in with the `full`
//! feature by this crate's `Cargo.toml`.
use std::io;
use std::time::Duration;
use async_trait::async_trait;
use tokio::net::TcpStream;
use harmony::topology::Topology;
use crate::companion::smoke::probe::{Probe, ProbeAttempt, ProbeName};
/// Probe that connects to `host:port` and immediately drops the
/// socket. Pass on `connect succeeded`.
#[derive(Debug, Clone)]
pub struct TcpReachable {
name: ProbeName,
address: String,
connect_timeout: Duration,
}
impl TcpReachable {
/// Default connect timeout is 1s — short enough that a polling
/// retry sees several attempts; long enough that a slow host
/// doesn't false-fail. Override with
/// [`Self::with_connect_timeout`].
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
pub fn new(name: ProbeName, address: impl Into<String>) -> Self {
Self {
name,
address: address.into(),
connect_timeout: Self::DEFAULT_CONNECT_TIMEOUT,
}
}
pub fn with_connect_timeout(mut self, t: Duration) -> Self {
self.connect_timeout = t;
self
}
pub fn address(&self) -> &str {
&self.address
}
}
#[async_trait]
impl<T: Topology> Probe<T> for TcpReachable {
fn name(&self) -> &ProbeName {
&self.name
}
async fn check(&self, _topology: &T) -> ProbeAttempt {
let connect = TcpStream::connect(self.address.as_str());
match tokio::time::timeout(self.connect_timeout, connect).await {
Ok(Ok(_stream)) => ProbeAttempt::Ok(Some(format!("connected to {}", self.address))),
Ok(Err(e)) => classify_connect_error(&self.address, e),
Err(_elapsed) => ProbeAttempt::Retry(format!(
"connect to {} timed out after {:?}",
self.address, self.connect_timeout
)),
}
}
}
/// Most connect errors look like "not ready yet" — we want to keep
/// polling. A small set is definitive and should short-circuit the
/// retry budget. `InvalidInput` and `Unsupported` both mean the
/// address is unparseable / unroutable on this host — no number of
/// retries will fix that.
fn classify_connect_error(address: &str, e: io::Error) -> ProbeAttempt {
match e.kind() {
io::ErrorKind::InvalidInput => {
ProbeAttempt::Fatal(format!("invalid address {address}: {e}"))
}
io::ErrorKind::Unsupported => {
ProbeAttempt::Fatal(format!("unsupported address {address}: {e}"))
}
// ConnectionRefused / ConnectionReset / TimedOut /
// HostUnreachable / NetworkUnreachable / NotFound (rare) —
// all consistent with "service not up yet". Keep polling.
_ => ProbeAttempt::Retry(format!("connect to {address}: {e}")),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::companion::smoke::probe::{ProbeFailure, ProbeOutcome, RetryPolicy, run_probe};
use async_trait::async_trait;
use harmony::topology::PreparationOutcome;
use tokio::net::TcpListener;
#[derive(Debug)]
struct NoopTopology;
#[async_trait]
impl Topology for NoopTopology {
fn name(&self) -> &str {
"noop"
}
async fn ensure_ready(
&self,
) -> Result<PreparationOutcome, harmony::topology::PreparationError> {
Ok(PreparationOutcome::Noop)
}
}
fn name(s: &str) -> ProbeName {
ProbeName::try_new(s).unwrap()
}
#[tokio::test]
async fn passes_against_a_bound_listener() {
// Bind to a kernel-assigned port and prove the probe sees
// the listener. Using 127.0.0.1 keeps this hermetic — no
// outbound network, no flakiness from name resolution.
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let probe = TcpReachable::new(name("local-listener"), addr.to_string());
match run_probe(&probe, &NoopTopology, RetryPolicy::once()).await {
ProbeOutcome::Pass { attempts, .. } => assert_eq!(attempts, 1),
other => panic!("expected pass, got {other:?}"),
}
}
#[tokio::test]
async fn retries_then_passes_when_listener_appears_late() {
// Pick a free port, then race: probe runs immediately, then
// we bind 30 ms later. The polling retry should catch it.
let pre = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = pre.local_addr().unwrap();
drop(pre); // free the port
let probe = TcpReachable::new(name("late-listener"), addr.to_string())
.with_connect_timeout(Duration::from_millis(50));
let (probe_result, _) = tokio::join!(
run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(10), Duration::from_secs(2)),
),
async {
tokio::time::sleep(Duration::from_millis(40)).await;
let listener = TcpListener::bind(addr).await.expect("rebind");
// Hold the listener until the probe is satisfied.
tokio::time::sleep(Duration::from_millis(500)).await;
drop(listener);
}
);
match probe_result {
ProbeOutcome::Pass { attempts, .. } => {
assert!(
attempts >= 2,
"expected at least 2 attempts, got {attempts}"
);
}
other => panic!("expected eventual pass, got {other:?}"),
}
}
#[tokio::test]
async fn times_out_on_unused_port() {
// Bind+drop to confirm the port is free in the kernel's
// sense, then probe against it with a tight budget. With
// nothing listening, the OS returns ECONNREFUSED, which we
// classify as Retry → budget expiry → Timeout.
let pre = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = pre.local_addr().unwrap();
drop(pre);
let probe = TcpReachable::new(name("nothing-here"), addr.to_string())
.with_connect_timeout(Duration::from_millis(50));
let outcome = run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(20), Duration::from_millis(100)),
)
.await;
match outcome {
ProbeOutcome::Fail {
failure: ProbeFailure::Timeout { .. },
..
} => {}
other => panic!("expected Timeout, got {other:?}"),
}
}
#[tokio::test]
async fn fatal_on_unparseable_address() {
// Connect to a syntactically invalid address. No amount of
// retrying will help; we should short-circuit to Fatal.
let probe = TcpReachable::new(name("bad-addr"), "not-an-address");
let outcome = run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(20), Duration::from_secs(5)),
)
.await;
match outcome {
ProbeOutcome::Fail {
failure: ProbeFailure::Rejected { detail },
attempts,
..
} => {
assert!(detail.contains("not-an-address"));
assert_eq!(attempts, 1, "Fatal must short-circuit on attempt 1");
}
other => panic!("expected Rejected, got {other:?}"),
}
}
}

View File

@@ -1,287 +0,0 @@
//! Smoke suite — an ordered sequence of [`Probe`]s with per-probe
//! retry budgets, and the report that comes back from running it.
//!
//! Suites are sequential by design in Phase 0. The mental model is
//! "first prove the basic connectivity, then prove the derived
//! state" — which is inherently ordered. Parallel stages are a
//! deferred extension; the public API never exposed sequentiality
//! as a guarantee, so adding `parallel_stage` later is additive.
//!
//! A [`SmokeReport`] is a pure value — easy to log, easy to
//! serialize for the dashboard, easy to assert against in tests. It
//! deliberately mirrors the
//! [`Outcome`](harmony::interpret::Outcome)/[`InterpretError`](harmony::interpret::InterpretError)
//! split that the rest of the framework uses: one type per "what
//! happened", not a Result soup.
use std::time::{Duration, Instant};
use harmony::topology::Topology;
use tracing::{info, info_span};
use super::probe::{Probe, ProbeName, ProbeOutcome, RetryPolicy, run_probe};
/// One stage in a [`SmokeSuite`]: a probe plus the retry budget that
/// applies to *this* invocation.
struct SmokeStage<T: Topology> {
probe: Box<dyn Probe<T>>,
policy: RetryPolicy,
}
/// An ordered, retry-aware composition of probes for one
/// [`SmokeTest`](super::SmokeTest).
///
/// Built via [`SmokeSuite::new`] + [`SmokeSuite::stage`]. Run via
/// [`SmokeSuite::run`]. The same probe type can appear multiple
/// times with different policies — that's the point of supplying
/// the policy at the stage level, not on the probe itself.
pub struct SmokeSuite<T: Topology> {
stages: Vec<SmokeStage<T>>,
}
impl<T: Topology> Default for SmokeSuite<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Topology> std::fmt::Debug for SmokeSuite<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SmokeSuite")
.field(
"stages",
&self
.stages
.iter()
.map(|s| s.probe.name().as_str())
.collect::<Vec<_>>(),
)
.finish()
}
}
impl<T: Topology> SmokeSuite<T> {
pub fn new() -> Self {
Self { stages: Vec::new() }
}
/// Append a stage. Builder-style so an `assemble` method reads
/// like a checklist:
///
/// ```ignore
/// SmokeSuite::new()
/// .stage(TcpReachable::new(name, "nats:4222"), RetryPolicy::polling(...))
/// .stage(HttpHealthy::new(name, "/healthz"), RetryPolicy::polling(...))
/// ```
pub fn stage<P>(mut self, probe: P, policy: RetryPolicy) -> Self
where
P: Probe<T> + 'static,
{
self.stages.push(SmokeStage {
probe: Box::new(probe) as Box<dyn Probe<T>>,
policy,
});
self
}
pub fn len(&self) -> usize {
self.stages.len()
}
pub fn is_empty(&self) -> bool {
self.stages.is_empty()
}
/// Drive every stage in declared order. Stops on the first
/// failure — a smoke test is a chain of preconditions, so
/// continuing past a failure rarely produces useful information
/// and risks flooding the dashboard with cascade failures.
pub async fn run(self, topology: &T) -> SmokeReport {
let span = info_span!("smoke_suite", probes = self.stages.len());
let _guard = span.enter();
let start = Instant::now();
let mut probes = Vec::with_capacity(self.stages.len());
for stage in self.stages {
let name = stage.probe.name().clone();
info!(probe = %name, "smoke stage started");
let outcome = run_probe(&*stage.probe, topology, stage.policy).await;
let passed = outcome.passed();
info!(
probe = %name,
passed,
elapsed_ms = elapsed_ms(&outcome),
"smoke stage finished",
);
probes.push(ProbeReport { name, outcome });
if !passed {
break;
}
}
SmokeReport {
probes,
elapsed: start.elapsed(),
}
}
}
fn elapsed_ms(outcome: &ProbeOutcome) -> u64 {
let d = match outcome {
ProbeOutcome::Pass { elapsed, .. } | ProbeOutcome::Fail { elapsed, .. } => *elapsed,
};
d.as_millis() as u64
}
/// What one probe stage produced. The `name` is repeated here (also
/// owned by the probe itself) so a `SmokeReport` is fully
/// self-contained — the probe `Box` is dropped after the suite runs.
#[derive(Debug, Clone)]
pub struct ProbeReport {
pub name: ProbeName,
pub outcome: ProbeOutcome,
}
impl ProbeReport {
pub fn passed(&self) -> bool {
self.outcome.passed()
}
}
/// What [`SmokeSuite::run`] returns. The dashboard reads probes top
/// to bottom; the orchestrator reads `passed()` to gate the deploy.
#[derive(Debug, Clone)]
pub struct SmokeReport {
pub probes: Vec<ProbeReport>,
pub elapsed: Duration,
}
impl SmokeReport {
/// `true` iff every probe in the suite passed.
///
/// An empty suite passes — there's nothing to fail. Callers who
/// want "a smoke test ran" semantics should check `!is_empty()`
/// in addition.
pub fn passed(&self) -> bool {
self.probes.iter().all(ProbeReport::passed)
}
pub fn is_empty(&self) -> bool {
self.probes.is_empty()
}
/// Iterator over only the failed probes — convenient for
/// rendering "X failures of Y" without filtering at the call
/// site.
pub fn failed(&self) -> impl Iterator<Item = &ProbeReport> {
self.probes.iter().filter(|p| !p.passed())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::companion::smoke::probe::{ProbeAttempt, ProbeFailure};
use async_trait::async_trait;
use harmony::topology::PreparationOutcome;
#[derive(Debug)]
struct NoopTopology;
#[async_trait]
impl Topology for NoopTopology {
fn name(&self) -> &str {
"noop"
}
async fn ensure_ready(
&self,
) -> Result<PreparationOutcome, harmony::topology::PreparationError> {
Ok(PreparationOutcome::Noop)
}
}
#[derive(Debug)]
struct Always {
name: ProbeName,
attempt: ProbeAttempt,
}
impl Always {
fn new(name: &str, attempt: ProbeAttempt) -> Self {
Self {
name: ProbeName::try_new(name).unwrap(),
attempt,
}
}
}
#[async_trait]
impl<T: Topology> Probe<T> for Always {
fn name(&self) -> &ProbeName {
&self.name
}
async fn check(&self, _t: &T) -> ProbeAttempt {
self.attempt.clone()
}
}
#[tokio::test]
async fn empty_suite_passes_vacuously() {
let report = SmokeSuite::<NoopTopology>::new().run(&NoopTopology).await;
assert!(report.passed());
assert!(report.is_empty());
}
#[tokio::test]
async fn all_pass_stages_aggregates_pass() {
let report = SmokeSuite::<NoopTopology>::new()
.stage(
Always::new("a", ProbeAttempt::Ok(None)),
RetryPolicy::once(),
)
.stage(
Always::new("b", ProbeAttempt::Ok(Some("ok".to_string()))),
RetryPolicy::once(),
)
.run(&NoopTopology)
.await;
assert!(report.passed());
assert_eq!(report.probes.len(), 2);
assert!(report.failed().count() == 0);
}
#[tokio::test]
async fn first_failure_short_circuits_remaining_stages() {
// The third stage must never run. We assert by name —
// its presence in the report would prove a regression.
let report = SmokeSuite::<NoopTopology>::new()
.stage(
Always::new("a", ProbeAttempt::Ok(None)),
RetryPolicy::once(),
)
.stage(
Always::new("b", ProbeAttempt::Fatal("nope".to_string())),
RetryPolicy::once(),
)
.stage(
Always::new("c", ProbeAttempt::Ok(None)),
RetryPolicy::once(),
)
.run(&NoopTopology)
.await;
assert!(!report.passed());
assert_eq!(report.probes.len(), 2);
assert_eq!(report.probes[0].name.as_str(), "a");
assert_eq!(report.probes[1].name.as_str(), "b");
// Cascading failures are noise — confirm only the actual
// culprit appears.
match &report.probes[1].outcome {
ProbeOutcome::Fail {
failure: ProbeFailure::Rejected { detail },
..
} => assert_eq!(detail, "nope"),
other => panic!("expected Rejected, got {other:?}"),
}
}
}