diff --git a/fleet/harmony-fleet-deploy/src/companion/mod.rs b/fleet/harmony-fleet-deploy/src/companion/mod.rs index 52d32d43..e2430517 100644 --- a/fleet/harmony-fleet-deploy/src/companion/mod.rs +++ b/fleet/harmony-fleet-deploy/src/companion/mod.rs @@ -13,11 +13,11 @@ //! - [`AgentObservation`] — derived view of a fleet agent's KV watch //! filter, computed from a typed `AgentConfig`. Used by the e2e //! harness to write desired-state keys the agent will pick up. -//! - [`smoke`] — the smoke-test contract. `Probe` trait, `SmokeSuite` -//! composition, `SmokeTest` companion, and the `deploy`/ -//! `deploy_with_smoke` wrappers that bind a Score's interpret to -//! an optional smoke run. Implements ADR-023 P4 ("deploy returns -//! only after smoke-test success") via the companion seam from P7. +//! - [`smoke`] — the smoke-test contract. One `SmokeTest` trait with +//! a single `verify` method returning a `SmokeReport`, plus +//! `deploy` / `deploy_with_smoke` free functions. Implements +//! ADR-023 P4 ("deploy returns only after smoke-test success") via +//! the companion seam from P7. pub mod agent; pub mod smoke; diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke.rs b/fleet/harmony-fleet-deploy/src/companion/smoke.rs new file mode 100644 index 00000000..6fe238f4 --- /dev/null +++ b/fleet/harmony-fleet-deploy/src/companion/smoke.rs @@ -0,0 +1,633 @@ +//! Smoke-test companion — minimal shape. +//! +//! Implements ADR-023 P4 ("deploy returns only after smoke-test +//! success") and P7 ("extend Scores with companions, not API +//! changes"). The earlier draft of this module decomposed smoke into +//! `Probe` / `ProbeAttempt` / `ProbeOutcome` / `ProbeFailure` / +//! `ProbeName` / `RetryPolicy` / `SmokeSuite` / `SmokeStage` / +//! `SmokeReport` / `SmokeTest` / `SmokeAssemblyError`. That was +//! cardinality matching gone overboard for what is, in the end, "run +//! an async function after deploy and surface a pipeline report." +//! This rewrite picks the smallest seam that still buys the things we +//! actually need. +//! +//! ## What we kept +//! +//! - [`SmokeTest`] — one trait, one async method. Implementer writes a +//! normal Rust function and returns a [`SmokeReport`]. +//! - Type-level pairing via the associated `type Interpret` +//! (suggested in PR #292 review): one smoke can cover every Score +//! that shares an Interpret (e.g. `HelmChartScore` + any preset on +//! top of it). **Pairing is convention-only in v0.3** — +//! `Score::create_interpret` returns a `Box>`, so +//! the `deploy_with_smoke` call site can't statically refuse a +//! mismatched smoke. Closing that gap is an additive refactor on +//! the `Score` trait (`type Interpret: Interpret;`) and is +//! deliberately deferred — the API shape supports it. +//! - [`deploy`] / [`deploy_with_smoke`] — free functions. `deploy` +//! runs the Score; `deploy_with_smoke` runs the Score then blocks +//! on smoke. +//! - Pipeline data the dashboard renders top-to-bottom +//! ([`CheckReport`] entries on [`SmokeReport`]). +//! +//! ## What we dropped, and what to do instead +//! +//! | Old type | Replaced by | +//! |---------------------------------------|-----------------------------------------------------------------------------| +//! | `Probe`, `ProbeAttempt`, `ProbeOutcome`, `ProbeFailure` | Write a normal `async fn` inside `verify`; push `CheckReport`s into a `Vec`. | +//! | `ProbeName` (validated newtype) | `&'static str` — the names come from our own static code, not user input. | +//! | `RetryPolicy` value type | [`poll_until`] free function. Pass the budget + interval directly. | +//! | `SmokeSuite` builder | A `Vec` collected inside `verify`. No intermediate type. | +//! | `TcpReachable` probe impl | [`tcp_reachable`] free function. Call it; it returns a `CheckReport`. | +//! +//! Reusability is preserved: the helpers ([`poll_until`], +//! [`tcp_reachable`]) compose by simple function call. A future +//! `http_healthy(url)` or `k8s_pod_ready(client, name)` belongs in +//! this module as another `async fn -> CheckReport`, not as another +//! trait. + +use std::future::Future; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use harmony::interpret::{Interpret, InterpretError, Outcome}; +use harmony::inventory::Inventory; +use harmony::score::Score; +use harmony::topology::Topology; +use thiserror::Error; +use tracing::{info, info_span}; + +/// Pair an [`Interpret`] type with an async smoke verification. +/// +/// One method. Implementers compose their checks inline; no probe +/// trait, no suite builder, no retry-policy value type. If retries +/// are needed, wrap the closure with [`poll_until`]; otherwise just +/// `await` once. +/// +/// Associating with `Interpret` (not the concrete `Score`) lets a +/// single smoke impl cover a family of Scores that share an +/// Interpret. Convention-only enforcement in v0.3 — see the +/// module-level docs. +#[async_trait] +pub trait SmokeTest: Send + Sync { + /// The Interpret class this smoke verifies. Documentary in v0.3; + /// becomes compile-time enforceable once `Score` gains its own + /// `type Interpret`. + type Interpret: Interpret; + + /// Run all checks. Implementers are free to short-circuit on + /// first failure, or run every check to give the dashboard a + /// full per-step picture. The framework reads `report.passed()` + /// to gate the deploy and renders `report.checks` for the user. + async fn verify(&self, topology: &T) -> SmokeReport; +} + +/// Aggregated smoke result. Dashboards render `checks` top-to-bottom. +/// +/// A report is "passing" iff it has at least one check AND every +/// check passed. An empty report passes nothing — the framework +/// rejects empty reports the same way it rejects failing ones, so a +/// smoke impl that forgets to push any checks fails loudly instead +/// of letting a deploy through. +#[derive(Debug, Clone, Default)] +pub struct SmokeReport { + pub checks: Vec, +} + +impl SmokeReport { + pub fn passed(&self) -> bool { + !self.checks.is_empty() && self.checks.iter().all(|c| c.passed) + } + + pub fn is_empty(&self) -> bool { + self.checks.is_empty() + } + + /// Iterator over only the failures — handy for compact error + /// rendering without filtering at the call site. + pub fn failed(&self) -> impl Iterator { + self.checks.iter().filter(|c| !c.passed) + } +} + +/// One step's result, named for the dashboard and the operator's +/// log. `detail` is `Some` whenever there's something useful to show +/// (a connect error, the elapsed time, the response body's first +/// line) — leave it `None` for trivial passes. +#[derive(Debug, Clone)] +pub struct CheckReport { + pub name: &'static str, + pub passed: bool, + pub detail: Option, +} + +impl CheckReport { + pub fn pass(name: &'static str) -> Self { + Self { + name, + passed: true, + detail: None, + } + } + + pub fn pass_with(name: &'static str, detail: impl Into) -> Self { + Self { + name, + passed: true, + detail: Some(detail.into()), + } + } + + pub fn fail(name: &'static str, detail: impl Into) -> Self { + Self { + name, + passed: false, + detail: Some(detail.into()), + } + } +} + +/// Poll a fallible async closure until it returns `Ok` or `budget` +/// elapses. Returns a single [`CheckReport`] capturing the outcome. +/// +/// Use this inside [`SmokeTest::verify`] when you need retries. +/// Otherwise just `await` once and build a `CheckReport` from the +/// result — no helper required. +pub async fn poll_until( + name: &'static str, + budget: Duration, + interval: Duration, + check: F, +) -> CheckReport +where + F: Fn() -> Fut, + Fut: Future>, +{ + let deadline = Instant::now() + budget; + loop { + match check().await { + Ok(()) => return CheckReport::pass(name), + Err(e) => { + if Instant::now() >= deadline { + return CheckReport::fail(name, format!("did not pass within {budget:?}: {e}")); + } + tokio::time::sleep(interval).await; + } + } + } +} + +/// One TCP connect attempt against `addr`, gated by `timeout`. +/// +/// Wrap with [`poll_until`] for retry semantics: +/// +/// ```ignore +/// poll_until("nats-reachable", Duration::from_secs(30), Duration::from_secs(1), || async { +/// tcp_reachable("once", "nats:4222", Duration::from_secs(1)) +/// .await +/// .passed +/// .then_some(()) +/// .ok_or_else(|| "still not reachable".to_string()) +/// }).await +/// ``` +pub async fn tcp_reachable(name: &'static str, addr: &str, timeout: Duration) -> CheckReport { + match tokio::time::timeout(timeout, tokio::net::TcpStream::connect(addr)).await { + Ok(Ok(_)) => CheckReport::pass(name), + Ok(Err(e)) => CheckReport::fail(name, format!("connect failed: {e}")), + Err(_) => CheckReport::fail(name, format!("timeout after {timeout:?}")), + } +} + +/// What `deploy*` returns on success. `smoke` is `None` for the +/// no-smoke variant, `Some` (with `passed() == true`) otherwise. +#[derive(Debug, Clone)] +pub struct DeployOutcome { + pub interpret: Outcome, + pub smoke: Option, +} + +#[derive(Debug, Error)] +pub enum DeployError { + #[error("score interpret failed: {0}")] + Interpret(#[from] InterpretError), + #[error( + "smoke failed: {} of {} check(s) did not pass", + .report.failed().count(), + .report.checks.len(), + )] + SmokeFailed { + /// The Score deployed cleanly — preserved so the dashboard + /// can render the success message next to the smoke failure. + interpret: Outcome, + report: SmokeReport, + }, +} + +/// Deploy a Score; do **not** run any smoke. Equivalent to calling +/// `score.interpret(...)` directly — provided so callers can switch +/// between smoke / no-smoke at one site. +pub async fn deploy( + score: S, + inventory: &Inventory, + topology: &T, +) -> Result +where + S: Score, + T: Topology, +{ + let span = info_span!("deploy", score = score.name()); + let _g = span.enter(); + info!("deploy: starting interpret (no smoke)"); + let interpret = score.interpret(inventory, topology).await?; + info!(status = %interpret.status, "deploy: interpret returned"); + Ok(DeployOutcome { + interpret, + smoke: None, + }) +} + +/// Deploy a Score and block on its smoke test (ADR-023 P4). +/// +/// Returns `Ok` only when every check in the report passed. An empty +/// report is rejected the same way a failing one is — a smoke impl +/// that forgot to push checks fails the deploy rather than silently +/// passing. +pub async fn deploy_with_smoke( + score: S, + smoke: &ST, + inventory: &Inventory, + topology: &T, +) -> Result +where + S: Score, + ST: SmokeTest, + T: Topology, +{ + let span = info_span!("deploy", score = score.name(), smoke = true); + let _g = span.enter(); + info!("deploy: starting interpret"); + let interpret = score.interpret(inventory, topology).await?; + info!(status = %interpret.status, "deploy: interpret returned; running smoke"); + let report = smoke.verify(topology).await; + info!( + passed = report.passed(), + checks = report.checks.len(), + failed = report.failed().count(), + "deploy: smoke returned", + ); + if report.passed() { + Ok(DeployOutcome { + interpret, + smoke: Some(report), + }) + } else { + Err(DeployError::SmokeFailed { interpret, report }) + } +} + +#[cfg(test)] +mod tests { + //! Behavioural tests for the minimal smoke shape. + //! + //! These exercise the deploy ↔ smoke handoff against a trivial + //! in-process Score / Interpret / Topology so we don't depend on + //! any external infrastructure. The fixtures are intentionally + //! tiny — they only carry what each test actually touches. + + use super::*; + use async_trait::async_trait; + use harmony::data::Version; + use harmony::interpret::{InterpretName, InterpretStatus}; + use harmony::inventory::Inventory; + use harmony::topology::{PreparationError, PreparationOutcome}; + use harmony_types::id::Id; + use serde::Serialize; + use std::net::SocketAddr; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::{Arc, Mutex}; + use tokio::net::TcpListener; + + #[derive(Debug)] + struct NoopTopology; + + #[async_trait] + impl Topology for NoopTopology { + fn name(&self) -> &str { + "noop" + } + async fn ensure_ready(&self) -> Result { + Ok(PreparationOutcome::Noop) + } + } + + /// A `Score` that records execute() calls and returns a configured + /// outcome. Tracks invocations so a test can assert "interpret ran + /// exactly once". + #[derive(Debug, Clone)] + struct CountingScore { + executed: Arc, + fail: bool, + } + + impl CountingScore { + fn new() -> Self { + Self { + executed: Arc::new(AtomicBool::new(false)), + fail: false, + } + } + fn failing() -> Self { + Self { + executed: Arc::new(AtomicBool::new(false)), + fail: true, + } + } + } + + impl Serialize for CountingScore { + fn serialize(&self, s: S) -> Result { + s.serialize_str("CountingScore") + } + } + + #[async_trait] + impl Score for CountingScore { + fn create_interpret(&self) -> Box> { + Box::new(CountingInterpret { + executed: self.executed.clone(), + fail: self.fail, + }) + } + fn name(&self) -> String { + "CountingScore".into() + } + } + + #[derive(Debug)] + struct CountingInterpret { + executed: Arc, + fail: bool, + } + + #[async_trait] + impl Interpret for CountingInterpret { + fn get_name(&self) -> InterpretName { + InterpretName::Custom("counting") + } + fn get_version(&self) -> Version { + Version::from("0.1.0").unwrap() + } + fn get_status(&self) -> InterpretStatus { + InterpretStatus::QUEUED + } + fn get_children(&self) -> Vec { + vec![] + } + async fn execute( + &self, + _i: &Inventory, + _t: &NoopTopology, + ) -> Result { + self.executed.store(true, Ordering::SeqCst); + if self.fail { + Err(InterpretError::new( + "counting: configured to fail".to_string(), + )) + } else { + Ok(Outcome::success("counting: ok".to_string())) + } + } + } + + /// A test smoke that returns a pre-recorded report and records + /// whether `verify` was called. Lets each test inject the exact + /// report shape it wants to drive `deploy_with_smoke` through. + struct FixedSmoke { + report: Mutex>, + verify_called: Arc, + } + + impl FixedSmoke { + fn new(report: SmokeReport) -> Self { + Self { + report: Mutex::new(Some(report)), + verify_called: Arc::new(AtomicBool::new(false)), + } + } + } + + #[async_trait] + impl SmokeTest for FixedSmoke { + type Interpret = CountingInterpret; + async fn verify(&self, _t: &NoopTopology) -> SmokeReport { + self.verify_called.store(true, Ordering::SeqCst); + self.report + .lock() + .unwrap() + .take() + .expect("verify called more than once") + } + } + + #[tokio::test] + async fn deploy_runs_interpret_and_returns_outcome() { + let score = CountingScore::new(); + let executed = score.executed.clone(); + let outcome = deploy(score, &Inventory::empty(), &NoopTopology) + .await + .expect("clean interpret should produce Ok"); + assert!(executed.load(Ordering::SeqCst), "interpret must run"); + assert!(outcome.smoke.is_none(), "no-smoke variant has no report"); + assert_eq!(outcome.interpret.message, "counting: ok"); + } + + #[tokio::test] + async fn deploy_propagates_interpret_failure() { + let result = deploy(CountingScore::failing(), &Inventory::empty(), &NoopTopology).await; + assert!(matches!(result, Err(DeployError::Interpret(_)))); + } + + #[tokio::test] + async fn deploy_with_smoke_returns_report_on_success() { + let score = CountingScore::new(); + let smoke = FixedSmoke::new(SmokeReport { + checks: vec![CheckReport::pass("a"), CheckReport::pass("b")], + }); + let outcome = deploy_with_smoke(score, &smoke, &Inventory::empty(), &NoopTopology) + .await + .expect("passing smoke should produce Ok"); + let report = outcome.smoke.expect("smoke report present on success"); + assert!(report.passed()); + assert_eq!(report.checks.len(), 2); + assert!(smoke.verify_called.load(Ordering::SeqCst)); + } + + #[tokio::test] + async fn deploy_with_smoke_fails_when_any_check_fails() { + // Pin the deploy-blocks-on-smoke contract: a single failed + // check fails the whole deploy, regardless of how many + // passed. The `interpret` field of the error must still be + // populated so the dashboard can show what succeeded + // alongside what didn't. + let smoke = FixedSmoke::new(SmokeReport { + checks: vec![ + CheckReport::pass("a"), + CheckReport::fail("b", "bad"), + CheckReport::pass("c"), + ], + }); + let err = deploy_with_smoke( + CountingScore::new(), + &smoke, + &Inventory::empty(), + &NoopTopology, + ) + .await + .expect_err("failing smoke must error"); + match err { + DeployError::SmokeFailed { report, interpret } => { + assert!(!report.passed()); + assert_eq!(report.failed().count(), 1); + assert_eq!(interpret.message, "counting: ok"); + } + other => panic!("expected SmokeFailed, got {other:?}"), + } + } + + #[tokio::test] + async fn deploy_with_smoke_rejects_empty_report() { + // A smoke impl that forgot to push any checks must fail + // loudly. Silent pass-through would defeat the entire point + // of the contract. + let smoke = FixedSmoke::new(SmokeReport::default()); + let err = deploy_with_smoke( + CountingScore::new(), + &smoke, + &Inventory::empty(), + &NoopTopology, + ) + .await + .expect_err("empty smoke must error"); + assert!(matches!(err, DeployError::SmokeFailed { .. })); + } + + #[tokio::test] + async fn deploy_with_smoke_skips_smoke_when_interpret_fails() { + // Interpret error short-circuits — running smoke against a + // half-deployed component would produce confusing cascade + // failures. + let smoke = FixedSmoke::new(SmokeReport { + checks: vec![CheckReport::pass("never-runs")], + }); + let verify_called = smoke.verify_called.clone(); + let err = deploy_with_smoke( + CountingScore::failing(), + &smoke, + &Inventory::empty(), + &NoopTopology, + ) + .await + .expect_err("interpret failure must error"); + assert!(matches!(err, DeployError::Interpret(_))); + assert!( + !verify_called.load(Ordering::SeqCst), + "smoke must NOT run when interpret fails", + ); + } + + #[tokio::test] + async fn smoke_report_passed_requires_nonempty_and_all_pass() { + assert!(!SmokeReport::default().passed()); + assert!( + !SmokeReport { + checks: vec![CheckReport::pass("a"), CheckReport::fail("b", "x")], + } + .passed() + ); + assert!( + SmokeReport { + checks: vec![CheckReport::pass("a"), CheckReport::pass("b")], + } + .passed() + ); + } + + #[tokio::test] + async fn poll_until_returns_pass_when_check_succeeds_within_budget() { + let calls = Arc::new(AtomicBool::new(false)); + let c = calls.clone(); + let check = poll_until( + "eventually-ok", + Duration::from_secs(2), + Duration::from_millis(10), + move || { + let c = c.clone(); + async move { + if c.swap(true, Ordering::SeqCst) { + Ok(()) + } else { + Err("not yet".into()) + } + } + }, + ) + .await; + assert!(check.passed); + assert_eq!(check.name, "eventually-ok"); + } + + #[tokio::test] + async fn poll_until_returns_fail_after_budget_elapses() { + let check = poll_until( + "never-ok", + Duration::from_millis(50), + Duration::from_millis(10), + || async { Err("still nope".into()) }, + ) + .await; + assert!(!check.passed); + assert!( + check + .detail + .as_deref() + .is_some_and(|d| d.contains("still nope")), + "detail must carry the last error, got {:?}", + check.detail, + ); + } + + #[tokio::test] + async fn tcp_reachable_passes_against_real_listener() { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr: SocketAddr = listener.local_addr().unwrap(); + // Accept loop so connect() completes; otherwise SYN-RST. + tokio::spawn(async move { + let _ = listener.accept().await; + }); + + let check = tcp_reachable("loopback", &addr.to_string(), Duration::from_secs(1)).await; + assert!(check.passed, "loopback connect must succeed: {check:?}"); + } + + #[tokio::test] + async fn tcp_reachable_fails_with_timeout_when_no_listener() { + // RFC 5737 TEST-NET-1: guaranteed-non-routable; a SYN here + // will sit unanswered and we'll hit our own timeout deterministically. + let check = tcp_reachable("dead-net", "192.0.2.1:1", Duration::from_millis(150)).await; + assert!(!check.passed); + assert!( + check + .detail + .as_deref() + .is_some_and(|d| d.contains("timeout") || d.contains("connect failed")), + ); + } + + #[tokio::test] + async fn deploy_outcome_smoke_field_is_none_for_no_smoke_deploy() { + let outcome = deploy(CountingScore::new(), &Inventory::empty(), &NoopTopology) + .await + .unwrap(); + assert!(outcome.smoke.is_none()); + } +} diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/contract.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/contract.rs deleted file mode 100644 index 67165db2..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/contract.rs +++ /dev/null @@ -1,82 +0,0 @@ -//! The Score → SmokeTest pairing contract. -//! -//! [`SmokeTest`] is the companion trait that pairs a [`Score`] with -//! the probes that verify *that specific* Score's deployment worked. -//! The pairing is type-level via the `Score` associated type — the -//! compiler refuses to call -//! [`deploy_with_smoke`](super::deploy::deploy_with_smoke) with a -//! `FleetOperatorSmokeTest` for a `DnsScore`, because their -//! associated types don't match. This is the same trick that makes -//! `K8sBareTopology: PostgreSQL` an unsatisfied trait bound when you -//! accidentally try to run PostgreSQL on a topology that doesn't -//! advertise it — see ADR-024 §2 and JG's *For the Love of -//! Compilers* talk. -//! -//! ## Why not a method on `Score`? -//! -//! Per ADR-023 P7, framework capabilities attach as companions, not -//! as new methods on `Score`/`Interpret`. The base trait surface -//! stays one method (`create_interpret`). Smoke is *something paired -//! with* a Score, not *something on* a Score. The orthogonal -//! benefit: a Score can be deployed without smoke, can be deployed -//! with one of several smoke configurations (dev, prod, e2e), or -//! can have its smoke evolved independently of its interpret logic. -//! -//! ## Why `assemble` returns a value, not a future-of-checks? -//! -//! The probes a smoke test wants to run are usually known up front -//! from the Score's data — "the service name we just deployed, -//! check it on the cluster's default DNS". Resolving that into a -//! concrete `SmokeSuite` *before* the probes run means the dashboard -//! can render the pipeline (probe names, stage count) the moment -//! smoke starts. If it returned a `Stream`, the -//! pipeline shape wouldn't be knowable until the run completed. - -use async_trait::async_trait; -use thiserror::Error; - -use harmony::score::Score; -use harmony::topology::Topology; - -use super::suite::SmokeSuite; - -/// Pair a [`Score`] with the [`SmokeSuite`] that proves its deploy -/// converged. -/// -/// Implementors live next to the Score they pair with — the -/// `FleetOperatorSmokeTest` ships from this crate alongside -/// `FleetOperatorScore`, the same way [`crate::AgentObservation`] -/// lives alongside `FleetAgentScore`. This is the companion -/// placement rule from ADR-023 §"Extend Scores with companions". -#[async_trait] -pub trait SmokeTest: Send + Sync { - /// The Score this smoke test verifies. The type lock means - /// `SM::Score = S` is enforced at every call site. - type Score: Score; - - /// Build the [`SmokeSuite`] that will run after the Score's - /// `interpret` succeeds. Has access to the Score so probes can - /// be parameterized on the deploy's actual inputs (service - /// names, namespaces, ports), and to the topology so probes can - /// pull credentials / endpoints from declared capabilities. - async fn assemble( - &self, - score: &Self::Score, - topology: &T, - ) -> Result, SmokeAssemblyError>; -} - -/// Why a smoke test failed to *build* (before any probe runs). -/// -/// Distinct from probe failure — assembly failure means the Score -/// is in an unexpected shape (missing endpoint, can't resolve a -/// reference). The dashboard shows this differently from a probe -/// failure: it's an authoring / configuration error, not a deployed- -/// system error. -#[derive(Debug, Clone, Error, PartialEq, Eq)] -pub enum SmokeAssemblyError { - #[error("smoke test could not derive endpoint for {what}: {detail}")] - MissingEndpoint { what: String, detail: String }, - #[error("smoke test assembly failed: {0}")] - Other(String), -} diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/deploy.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/deploy.rs deleted file mode 100644 index aa1aa040..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/deploy.rs +++ /dev/null @@ -1,387 +0,0 @@ -//! `deploy` and `deploy_with_smoke` — the framework-glue wrappers -//! that bind a [`Score`]'s interpret to an optional [`SmokeTest`]. -//! -//! These are free functions, not methods on [`Score`]/[`Maestro`] -//! /[`Interpret`]. The framework's domain directory is left -//! untouched in this PR (ADR-023 P7 — "extend Scores with companions, -//! not API changes"). If a later PR proves this shape, promoting -//! `deploy_with_smoke` to a `Maestro` convenience is a one-line -//! additive change. -//! -//! ## Opting in or out -//! -//! Two entry points, one decision at the call site: -//! -//! - [`deploy`] — run the Score's interpret, no smoke. -//! - [`deploy_with_smoke`] — run the Score's interpret, then the -//! paired [`SmokeTest`]. Returns only after the smoke run -//! completes; a failed smoke is a deploy error per the project -//! rule "deploy blocks on smoke-test". -//! -//! Callers (CLI binaries, the e2e harness, the operator's -//! reconciler) choose which to call. There is no global config knob -//! and no per-Score override — the choice is one function call. - -use harmony::interpret::{InterpretError, Outcome}; -use harmony::inventory::Inventory; -use harmony::score::Score; -use harmony::topology::Topology; -use thiserror::Error; -use tracing::{info, info_span}; - -use super::contract::{SmokeAssemblyError, SmokeTest}; -use super::suite::SmokeReport; - -/// What [`deploy`]/[`deploy_with_smoke`] return on success. -/// -/// `interpret` is the underlying `Score::interpret` result — -/// preserved verbatim so callers that previously consumed `Outcome` -/// directly continue to work after switching to these wrappers. -/// `smoke` is `None` when the no-smoke variant was used; `Some` -/// otherwise, with `passed() == true` guaranteed (a failed smoke is -/// a [`DeployError::SmokeFailed`]). -#[derive(Debug, Clone)] -pub struct DeployOutcome { - pub interpret: Outcome, - pub smoke: Option, -} - -/// All the ways a deploy can fail. Three arms, each pointing at a -/// concrete cause the operator can act on: -/// -/// - `Interpret` — the Score's own work failed (helm error, kube -/// apply error, etc). Same shape callers got before smoke existed. -/// - `SmokeAssembly` — the Score deployed, but its `SmokeTest` -/// couldn't even *build* a suite. Typically means the test's -/// author is missing an endpoint reference; an authoring bug, not -/// a runtime bug. -/// - `SmokeFailed` — the Score deployed and the suite built, but -/// one or more probes failed. The full [`SmokeReport`] is -/// attached so the dashboard can render which stages passed and -/// which didn't, instead of a single opaque "deploy failed". -#[derive(Debug, Error)] -pub enum DeployError { - #[error("score interpret failed: {0}")] - Interpret(#[from] InterpretError), - #[error("smoke assembly failed: {0}")] - SmokeAssembly(#[from] SmokeAssemblyError), - #[error("smoke test failed ({} of {} probe(s) did not pass)", .report.failed().count(), .report.probes.len())] - SmokeFailed { - /// The Score deployed cleanly — `interpret` is preserved so - /// callers (and the dashboard) can render the underlying - /// success message alongside the smoke failure. - interpret: Outcome, - report: SmokeReport, - }, -} - -/// Deploy a Score without running any smoke checks. -/// -/// Returns immediately after `score.interpret` returns. Equivalent -/// to calling `score.interpret(...)` directly — provided here so -/// callers can switch between smoke / no-smoke without changing -/// the surrounding control flow. -pub async fn deploy( - score: S, - inventory: &Inventory, - topology: &T, -) -> Result -where - T: Topology, - S: Score, -{ - let span = info_span!("deploy", score = score.name()); - let _g = span.enter(); - info!("deploy: starting interpret (no smoke)"); - let interpret = score.interpret(inventory, topology).await?; - info!(status = %interpret.status, "deploy: interpret returned"); - Ok(DeployOutcome { - interpret, - smoke: None, - }) -} - -/// Deploy a Score, then run its paired SmokeTest. Block on the -/// suite — see project rule "deploy blocks on smoke-test". -/// -/// The type bound `SM: SmokeTest` is where the -/// compile-time pairing lives: the `SmokeTest` you pass must -/// declare *this* Score type as its associated `Score`. A mismatch -/// is rejected at the call site, not at runtime. (See ADR-024 §2 -/// and JG's compile-time-feedback principle.) -pub async fn deploy_with_smoke( - score: S, - smoke: SM, - inventory: &Inventory, - topology: &T, -) -> Result -where - T: Topology, - S: Score, - SM: SmokeTest, -{ - let span = info_span!("deploy", score = score.name(), smoke = true); - let _g = span.enter(); - - info!("deploy: starting interpret"); - let interpret = score.interpret(inventory, topology).await?; - info!(status = %interpret.status, "deploy: interpret returned, assembling smoke"); - - let suite = smoke.assemble(&score, topology).await?; - info!(stages = suite.len(), "deploy: smoke suite assembled"); - - let report = suite.run(topology).await; - info!( - passed = report.passed(), - elapsed_ms = report.elapsed.as_millis() as u64, - "deploy: smoke finished", - ); - - if !report.passed() { - return Err(DeployError::SmokeFailed { interpret, report }); - } - - Ok(DeployOutcome { - interpret, - smoke: Some(report), - }) -} - -// ---- tests ----------------------------------------------------------------- - -#[cfg(test)] -mod tests { - use super::*; - use crate::companion::smoke::probe::{Probe, ProbeAttempt, ProbeName, RetryPolicy}; - use crate::companion::smoke::suite::SmokeSuite; - use async_trait::async_trait; - use harmony::data::Version; - use harmony::interpret::{Interpret, InterpretName, InterpretStatus}; - use harmony::topology::PreparationOutcome; - use harmony_types::id::Id; - use serde::Serialize; - use std::sync::Mutex; - - // -- test fixtures: a NoopTopology + minimal Score + minimal Probe -------- - - #[derive(Debug)] - struct NoopTopology; - - #[async_trait] - impl Topology for NoopTopology { - fn name(&self) -> &str { - "noop" - } - async fn ensure_ready( - &self, - ) -> Result { - Ok(PreparationOutcome::Noop) - } - } - - /// Minimal Score whose interpret can be scripted to succeed or - /// fail. Demonstrates that `deploy_with_smoke` short-circuits - /// correctly when interpret fails. - #[derive(Debug, Clone, Serialize)] - struct TrivialScore { - label: String, - should_fail: bool, - } - - impl Score for TrivialScore { - fn create_interpret(&self) -> Box> { - Box::new(TrivialInterpret { - should_fail: self.should_fail, - label: self.label.clone(), - }) - } - fn name(&self) -> String { - format!("TrivialScore({})", self.label) - } - } - - #[derive(Debug)] - struct TrivialInterpret { - should_fail: bool, - label: String, - } - - #[async_trait] - impl Interpret for TrivialInterpret { - async fn execute( - &self, - _inventory: &Inventory, - _topology: &T, - ) -> Result { - if self.should_fail { - Err(InterpretError::new(format!( - "scripted failure: {}", - self.label - ))) - } else { - Ok(Outcome::success(format!("deployed {}", self.label))) - } - } - fn get_name(&self) -> InterpretName { - InterpretName::Custom("TrivialInterpret") - } - fn get_version(&self) -> Version { - Version::from("0.0.0").unwrap() - } - fn get_status(&self) -> InterpretStatus { - InterpretStatus::QUEUED - } - fn get_children(&self) -> Vec { - vec![] - } - } - - /// A probe whose every attempt is hardcoded. Lets us script - /// "this stage passes" or "this stage rejects" in tests. - #[derive(Debug)] - struct Always { - name: ProbeName, - attempt: ProbeAttempt, - } - impl Always { - fn new(name: &str, attempt: ProbeAttempt) -> Self { - Self { - name: ProbeName::try_new(name).unwrap(), - attempt, - } - } - } - #[async_trait] - impl Probe for Always { - fn name(&self) -> &ProbeName { - &self.name - } - async fn check(&self, _t: &T) -> ProbeAttempt { - self.attempt.clone() - } - } - - /// The shape we expect every fleet smoke test to follow: it - /// owns no state beyond what the Score gives it. - #[derive(Debug)] - struct TrivialSmoke { - outcomes: Mutex>, - } - impl TrivialSmoke { - fn new(outcomes: Vec) -> Self { - Self { - outcomes: Mutex::new(outcomes), - } - } - } - - #[async_trait] - impl SmokeTest for TrivialSmoke { - type Score = TrivialScore; - async fn assemble( - &self, - score: &Self::Score, - _topology: &T, - ) -> Result, SmokeAssemblyError> { - let mut suite = SmokeSuite::new(); - for (i, attempt) in self.outcomes.lock().unwrap().iter().enumerate() { - let name = format!("{}-probe-{i}", score.label); - suite = suite.stage(Always::new(&name, attempt.clone()), RetryPolicy::once()); - } - Ok(suite) - } - } - - // -- the actual contract tests -------------------------------------------- - - fn inv() -> Inventory { - Inventory::empty() - } - - #[tokio::test] - async fn deploy_without_smoke_returns_outcome_and_no_report() { - let s = TrivialScore { - label: "alpha".to_string(), - should_fail: false, - }; - let r = deploy(s, &inv(), &NoopTopology).await.expect("deploy ok"); - assert_eq!(r.interpret.status, InterpretStatus::SUCCESS); - assert!( - r.smoke.is_none(), - "no-smoke variant must not synthesize a report" - ); - } - - #[tokio::test] - async fn interpret_failure_short_circuits_before_smoke_assembly() { - // If interpret fails, we MUST NOT call `smoke.assemble` — - // there is nothing deployed to verify, and running smoke - // would produce confusing cascade failures in the report. - let s = TrivialScore { - label: "beta".to_string(), - should_fail: true, - }; - // Use a smoke that would panic if assembled — proves we - // never got there. - struct PanicSmoke; - #[async_trait] - impl SmokeTest for PanicSmoke { - type Score = TrivialScore; - async fn assemble( - &self, - _: &Self::Score, - _: &T, - ) -> Result, SmokeAssemblyError> { - panic!("assemble must not be called when interpret fails"); - } - } - match deploy_with_smoke(s, PanicSmoke, &inv(), &NoopTopology).await { - Err(DeployError::Interpret(e)) => { - assert!(format!("{e}").contains("scripted failure")); - } - other => panic!("expected Interpret error, got {other:?}"), - } - } - - #[tokio::test] - async fn all_probes_pass_yields_deployoutcome_with_report() { - let s = TrivialScore { - label: "gamma".to_string(), - should_fail: false, - }; - let smoke = TrivialSmoke::new(vec![ - ProbeAttempt::Ok(Some("first stage ok".to_string())), - ProbeAttempt::Ok(None), - ]); - let r = deploy_with_smoke(s, smoke, &inv(), &NoopTopology) - .await - .expect("deploy + smoke ok"); - let report = r.smoke.expect("smoke report present when smoke runs"); - assert!(report.passed()); - assert_eq!(report.probes.len(), 2); - } - - #[tokio::test] - async fn probe_failure_blocks_deploy() { - let s = TrivialScore { - label: "delta".to_string(), - should_fail: false, - }; - let smoke = TrivialSmoke::new(vec![ - ProbeAttempt::Ok(None), - ProbeAttempt::Fatal("definitely broken".to_string()), - ]); - match deploy_with_smoke(s, smoke, &inv(), &NoopTopology).await { - Err(DeployError::SmokeFailed { interpret, report }) => { - // The Score itself succeeded — that's preserved, so - // the dashboard can show "deploy converged, smoke - // disagrees". - assert_eq!(interpret.status, InterpretStatus::SUCCESS); - assert!(!report.passed()); - assert_eq!(report.failed().count(), 1); - assert_eq!(report.probes.last().unwrap().name.as_str(), "delta-probe-1"); - } - other => panic!("expected SmokeFailed, got {other:?}"), - } - } -} diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/mod.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/mod.rs deleted file mode 100644 index 35bd92dd..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/mod.rs +++ /dev/null @@ -1,62 +0,0 @@ -//! Smoke-test contract — the second Score companion (ADR-023 P7, -//! ADR-023 P4 "deploy returns only after smoke-test success"). -//! -//! ## The big picture -//! -//! ```text -//! ┌────────────┐ pair ┌──────────────┐ assemble ┌────────────┐ -//! │ *Score │◀─type─────│ SmokeTest │─────────────▶│ SmokeSuite │ -//! └────────────┘ locked └──────────────┘ └────────────┘ -//! ▲ │ -//! │ │ -//! deploy_with_smoke ─── interpret ─── then ── run probes ──▶ SmokeReport -//! ``` -//! -//! - [`Probe`] is the single-attempt unit, classifying its -//! observation as [`Ok`/`Retry`/`Fatal`](probe::ProbeAttempt). It's -//! also the stage the dashboard renders. -//! - [`SmokeSuite`] is an ordered sequence of probes with per-stage -//! [`RetryPolicy`]. Sequential by design; parallel stages deferred. -//! - [`SmokeTest`] pairs a [`Score`](harmony::score::Score) with a -//! suite by associated type — a `SmokeTest` -//! cannot be passed to `deploy_with_smoke(DnsScore, …)` because the -//! compiler refuses the mismatch. -//! - [`deploy`] / [`deploy_with_smoke`] are free functions that -//! bind a Score's interpret to an optional SmokeTest. Returns -//! only after the smoke run completes — see -//! [`DeployError::SmokeFailed`] for the failure shape. -//! -//! ## Where this lives -//! -//! Inside `harmony-fleet-deploy` for Phase 0, alongside the -//! existing [`AgentObservation`](super::AgentObservation) companion. -//! When a second consumer (PostgreSQL deploy, monitoring stack) -//! adopts this contract, the module promotes to a top-level -//! `harmony-smoke` crate with no API churn — the move is mechanical -//! because nothing in here depends on fleet types. -//! -//! ## What this does NOT do -//! -//! - **No new `HarmonyEvent` variants in Phase 0.** Stage progress -//! is emitted via `tracing::info!` events inside `info_span!`s. -//! A later PR can add `HarmonyEvent::SmokeStage{Started,Finished}` -//! if the dashboard wants the typed seam. -//! - **No `Score`/`Interpret` trait edits.** Per ADR-023 P7. The -//! smoke seam is purely companion-shaped. -//! - **No CLI flag yet.** Phase 1 wires `harmony-fleet-deploy -//! --no-smoke` once a real `FleetOperatorSmokeTest` exists. - -pub mod contract; -pub mod deploy; -pub mod probe; -pub mod probes; -pub mod suite; - -pub use contract::{SmokeAssemblyError, SmokeTest}; -pub use deploy::{DeployError, DeployOutcome, deploy, deploy_with_smoke}; -pub use probe::{ - InvalidProbeName, Probe, ProbeAttempt, ProbeFailure, ProbeName, ProbeOutcome, RetryPolicy, - run_probe, -}; -pub use probes::TcpReachable; -pub use suite::{ProbeReport, SmokeReport, SmokeSuite}; diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/probe.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/probe.rs deleted file mode 100644 index 2a4a0e5e..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/probe.rs +++ /dev/null @@ -1,515 +0,0 @@ -//! Probe contract — the single-attempt unit of a smoke run. -//! -//! A [`Probe`] is what gets visualized as one stage of the smoke -//! pipeline. Each implementation answers one question against a -//! deployed system: "is this port reachable", "did this pod become -//! ready", "is this KV key set to the value we expect". -//! -//! The retry loop is **not** a probe responsibility. A probe's -//! [`check`](Probe::check) returns one of three classifications via -//! [`ProbeAttempt`] — `Ok`, `Retry`, or `Fatal` — and [`run_probe`] -//! drives the polling timer + timeout budget around it. This keeps -//! probes single-responsibility and makes their unit tests -//! deterministic. -//! -//! Each public type here is cardinality-matched to the concept it -//! represents (see ADR-024 §2 and JG's *For the Love of Compilers* -//! talk). `ProbeName` is a validated newtype, not `String`, because -//! "an empty probe name" or "a probe name with a newline in it" -//! cannot mean anything useful and would corrupt the dashboard -//! rendering. `ProbeAttempt` is a three-arm sum, not `Result<(), Error>`, -//! because "not ready yet" and "definitively no" must drive different -//! orchestration paths. - -use std::fmt; -use std::time::{Duration, Instant}; - -use async_trait::async_trait; -use thiserror::Error; -use tracing::{debug, warn}; - -use harmony::topology::Topology; - -/// One attempted check by a probe. -/// -/// The probe classifies its own observation rather than letting the -/// orchestrator guess: a 5xx response is fundamentally different from -/// a connection refused, even if both look like "failure" to a naive -/// caller. The orchestrator ([`run_probe`]) uses this classification -/// to decide whether to keep polling or bail out. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ProbeAttempt { - /// The probe's invariant holds *right now*. Optional detail is - /// surfaced in the smoke report (e.g. "HTTP 200 in 12 ms", - /// "found 3 ready pods"). - Ok(Option), - /// The probe didn't observe what it's looking for yet, but the - /// state is consistent with eventual success. Keep polling until - /// the timeout. The string is the most recent observation, kept - /// so that a final timeout can quote the last thing we saw. - Retry(String), - /// The probe got a definitive negative answer — no amount of - /// further polling will change it. Skip the rest of the budget - /// and report failure now. Example: HTTP 404 from a URL that - /// must exist after deploy, or a `Service` with the wrong - /// selector. - Fatal(String), -} - -/// Result of a full probe run after [`run_probe`] has consumed the -/// retry budget (or short-circuited). -/// -/// This is the per-probe shape that lands in -/// [`crate::companion::smoke::SmokeReport`]. The orchestrator wraps -/// it with the [`ProbeName`] and the elapsed time so the dashboard -/// can render each stage independently. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ProbeOutcome { - /// At least one attempt returned [`ProbeAttempt::Ok`] within the - /// retry budget. - Pass { - elapsed: Duration, - attempts: u32, - detail: Option, - }, - /// Either we exhausted the budget with only `Retry`s (timeout) - /// or a single `Fatal` short-circuited us (rejected). - Fail { - elapsed: Duration, - attempts: u32, - failure: ProbeFailure, - }, -} - -impl ProbeOutcome { - pub fn passed(&self) -> bool { - matches!(self, Self::Pass { .. }) - } -} - -/// Why a probe failed. Two arms, deliberately — they map to two -/// different operator actions: "I waited and nothing happened" needs -/// a longer budget or a stuck-component investigation; "something -/// gave me a hard no" needs a configuration fix. -#[derive(Debug, Clone, PartialEq, Eq, Error)] -pub enum ProbeFailure { - #[error("timed out after {budget:?} (last observation: {last_observation})")] - Timeout { - budget: Duration, - last_observation: String, - }, - #[error("rejected: {detail}")] - Rejected { detail: String }, -} - -/// Polling parameters for one probe. -/// -/// `interval` is the wait between attempts. `timeout` is the total -/// wall-clock budget from the first attempt to the last; the loop -/// will not start a new attempt past this budget but will let an -/// in-flight one finish. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct RetryPolicy { - interval: Duration, - timeout: Duration, -} - -impl RetryPolicy { - /// Standard polling shape: try, wait `interval`, try again, until - /// either an `Ok`/`Fatal` happens or `timeout` elapses. - /// - /// Panics if `interval` is zero — a zero interval combined with - /// any non-trivial check would spin the executor and produce no - /// useful retry semantics. Express "run once" via [`Self::once`] - /// instead. - pub fn polling(interval: Duration, timeout: Duration) -> Self { - assert!( - !interval.is_zero(), - "RetryPolicy::polling requires a non-zero interval; use RetryPolicy::once for a single attempt", - ); - Self { interval, timeout } - } - - /// Single-shot: one attempt, no retries. Useful for probes whose - /// success criterion is binary the moment we check (e.g. "is - /// this string equal to that string"). - pub fn once() -> Self { - Self { - interval: Duration::from_millis(1), - timeout: Duration::ZERO, - } - } - - pub fn interval(&self) -> Duration { - self.interval - } - - pub fn timeout(&self) -> Duration { - self.timeout - } -} - -/// A validated probe name. -/// -/// Probe names appear in `tracing` events, error messages, the smoke -/// report, and the eventual dashboard. Constraining them at -/// construction means the dashboard never has to defend against -/// surprising bytes (control chars, newlines, etc). -#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct ProbeName(String); - -#[derive(Debug, Error, PartialEq, Eq)] -pub enum InvalidProbeName { - #[error("probe name must not be empty")] - Empty, - #[error("probe name must not exceed 128 bytes (got {0})")] - TooLong(usize), - #[error("probe name must not contain control characters")] - ControlChar, -} - -impl ProbeName { - pub fn try_new(s: impl Into) -> Result { - let s = s.into(); - if s.is_empty() { - return Err(InvalidProbeName::Empty); - } - if s.len() > 128 { - return Err(InvalidProbeName::TooLong(s.len())); - } - if s.chars().any(|c| c.is_control()) { - return Err(InvalidProbeName::ControlChar); - } - Ok(Self(s)) - } - - pub fn as_str(&self) -> &str { - &self.0 - } -} - -impl fmt::Display for ProbeName { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str(&self.0) - } -} - -/// One pipeline stage of a [`SmokeSuite`](super::SmokeSuite). -/// -/// Object-safe so a suite can hold a heterogeneous `Vec>>`. The associated retry behavior is supplied separately -/// by the suite (via [`RetryPolicy`]) so that the same probe type -/// can be reused with different budgets in different suites. -#[async_trait] -pub trait Probe: Send + Sync + std::fmt::Debug { - fn name(&self) -> &ProbeName; - async fn check(&self, topology: &T) -> ProbeAttempt; -} - -/// Drive a probe through its retry policy and return the consolidated -/// outcome. -/// -/// Behavior: -/// -/// - Every `interval` we run `probe.check(...)`. A successful attempt -/// wins immediately; a `Fatal` short-circuits to `Rejected`; a -/// `Retry` records its observation and we wait. -/// - If `timeout` is `Duration::ZERO` we run exactly one attempt -/// (the "once" shape). -/// - Otherwise we keep attempting until the elapsed wall time -/// exceeds `timeout`. The last `Retry` observation becomes the -/// `Timeout::last_observation` field so the failure message tells -/// the operator what we kept seeing. -/// -/// The function emits a `tracing` event per attempt at `debug` level -/// and one at `warn` for the final failure (so a Phase-1 dashboard -/// can subscribe via a `tracing` layer without us needing to add a -/// new `HarmonyEvent` variant yet). -pub async fn run_probe(probe: &P, topology: &T, policy: RetryPolicy) -> ProbeOutcome -where - T: Topology, - P: Probe + ?Sized, -{ - let start = Instant::now(); - let mut attempts: u32 = 0; - - loop { - attempts += 1; - // Each iteration produces either an early return (Ok / Fatal) - // or the most recent Retry observation. Threading that - // observation back into the next iteration only happens - // implicitly via the timeout check below, so there's no - // long-lived `last_observation` to lint over. - let last_observation = match probe.check(topology).await { - ProbeAttempt::Ok(detail) => { - debug!( - probe = %probe.name(), - attempts, - elapsed_ms = start.elapsed().as_millis() as u64, - "probe passed", - ); - return ProbeOutcome::Pass { - elapsed: start.elapsed(), - attempts, - detail, - }; - } - ProbeAttempt::Fatal(detail) => { - warn!( - probe = %probe.name(), - attempts, - %detail, - "probe rejected (fatal)", - ); - return ProbeOutcome::Fail { - elapsed: start.elapsed(), - attempts, - failure: ProbeFailure::Rejected { detail }, - }; - } - ProbeAttempt::Retry(obs) => { - debug!( - probe = %probe.name(), - attempts, - observation = %obs, - "probe retry", - ); - obs - } - }; - - if start.elapsed() >= policy.timeout { - warn!( - probe = %probe.name(), - attempts, - budget_ms = policy.timeout.as_millis() as u64, - last_observation = %last_observation, - "probe timed out", - ); - return ProbeOutcome::Fail { - elapsed: start.elapsed(), - attempts, - failure: ProbeFailure::Timeout { - budget: policy.timeout, - last_observation, - }, - }; - } - - tokio::time::sleep(policy.interval).await; - } -} - -#[cfg(test)] -mod tests { - use super::*; - use harmony::topology::PreparationOutcome; - use std::sync::Mutex; - use std::sync::atomic::{AtomicU32, Ordering}; - - // -- ProbeName ----------------------------------------------------------- - - #[test] - fn probe_name_accepts_normal_strings() { - assert!(ProbeName::try_new("nats-reachable").is_ok()); - assert!(ProbeName::try_new("HTTP / health").is_ok()); - } - - #[test] - fn probe_name_rejects_empty() { - assert_eq!(ProbeName::try_new(""), Err(InvalidProbeName::Empty)); - } - - #[test] - fn probe_name_rejects_control_chars() { - // Dashboard renders these as garbage — fail at construction. - assert_eq!( - ProbeName::try_new("bad\nname"), - Err(InvalidProbeName::ControlChar) - ); - } - - #[test] - fn probe_name_rejects_overlong() { - let long = "x".repeat(129); - assert_eq!( - ProbeName::try_new(long.clone()), - Err(InvalidProbeName::TooLong(129)) - ); - } - - // -- RetryPolicy --------------------------------------------------------- - - #[test] - fn retry_policy_once_is_single_attempt() { - let p = RetryPolicy::once(); - assert_eq!(p.timeout(), Duration::ZERO); - } - - #[test] - #[should_panic(expected = "non-zero interval")] - fn retry_policy_polling_rejects_zero_interval() { - // Zero interval + non-trivial check is a spin loop. The - // type system can't catch this, so a runtime assert at - // construction is the next best thing — fail loud, fail - // early, not on the operator's deploy log. - let _ = RetryPolicy::polling(Duration::ZERO, Duration::from_secs(5)); - } - - // -- run_probe orchestration -------------------------------------------- - - /// Test probe that returns a scripted sequence of attempts, then - /// loops forever on the last one. Lets us simulate - /// "retry-then-ok" without real network I/O. - #[derive(Debug)] - struct ScriptedProbe { - name: ProbeName, - script: Mutex>, - calls: AtomicU32, - } - - impl ScriptedProbe { - fn new(name: &str, script: Vec) -> Self { - Self { - name: ProbeName::try_new(name).unwrap(), - script: Mutex::new(script), - calls: AtomicU32::new(0), - } - } - } - - #[async_trait] - impl Probe for ScriptedProbe { - fn name(&self) -> &ProbeName { - &self.name - } - async fn check(&self, _t: &T) -> ProbeAttempt { - self.calls.fetch_add(1, Ordering::SeqCst); - let mut s = self.script.lock().unwrap(); - if s.len() == 1 { - s[0].clone() - } else { - s.remove(0) - } - } - } - - #[derive(Debug)] - struct NoopTopology; - - #[async_trait] - impl Topology for NoopTopology { - fn name(&self) -> &str { - "noop" - } - async fn ensure_ready( - &self, - ) -> Result { - Ok(PreparationOutcome::Noop) - } - } - - #[tokio::test] - async fn ok_on_first_attempt_yields_pass() { - let probe = ScriptedProbe::new("instant", vec![ProbeAttempt::Ok(Some("hi".to_string()))]); - let outcome = run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(1), Duration::from_millis(50)), - ) - .await; - match outcome { - ProbeOutcome::Pass { - attempts, detail, .. - } => { - assert_eq!(attempts, 1); - assert_eq!(detail.as_deref(), Some("hi")); - } - other => panic!("expected Pass, got {other:?}"), - } - } - - #[tokio::test] - async fn retries_until_ok_within_budget() { - let probe = ScriptedProbe::new( - "eventual", - vec![ - ProbeAttempt::Retry("not yet (1)".to_string()), - ProbeAttempt::Retry("not yet (2)".to_string()), - ProbeAttempt::Ok(None), - ], - ); - let outcome = run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(2), Duration::from_millis(500)), - ) - .await; - match outcome { - ProbeOutcome::Pass { attempts, .. } => assert_eq!(attempts, 3), - other => panic!("expected Pass after retries, got {other:?}"), - } - } - - #[tokio::test] - async fn fatal_short_circuits_without_consuming_full_budget() { - let probe = ScriptedProbe::new( - "definite-no", - vec![ProbeAttempt::Fatal("wrong selector".to_string())], - ); - let start = Instant::now(); - let outcome = run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(5), Duration::from_secs(10)), - ) - .await; - // The whole point of Fatal is that we don't burn the budget. - // Generous upper bound — CI variance, not a real check. - assert!( - start.elapsed() < Duration::from_secs(1), - "Fatal should return immediately, took {:?}", - start.elapsed() - ); - match outcome { - ProbeOutcome::Fail { - failure: ProbeFailure::Rejected { detail }, - .. - } => assert_eq!(detail, "wrong selector"), - other => panic!("expected Rejected, got {other:?}"), - } - } - - #[tokio::test] - async fn always_retry_yields_timeout_with_last_observation() { - let probe = ScriptedProbe::new( - "never-ready", - vec![ProbeAttempt::Retry("still pending".to_string())], - ); - let outcome = run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(5), Duration::from_millis(40)), - ) - .await; - match outcome { - ProbeOutcome::Fail { - failure: - ProbeFailure::Timeout { - budget, - last_observation, - }, - attempts, - .. - } => { - assert_eq!(budget, Duration::from_millis(40)); - assert_eq!(last_observation, "still pending"); - assert!( - attempts >= 2, - "expected at least two attempts, got {attempts}" - ); - } - other => panic!("expected Timeout, got {other:?}"), - } - } -} diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/probes/mod.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/probes/mod.rs deleted file mode 100644 index 8fe2e1a9..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/probes/mod.rs +++ /dev/null @@ -1,15 +0,0 @@ -//! First-party [`Probe`](super::Probe) implementations. -//! -//! Phase 0 ships exactly one — [`TcpReachable`] — to validate the -//! trait shape against real network I/O. Phase 1 adds HTTP, K8s -//! pod-ready, NATS KV. Each future probe is one self-contained -//! file under this module. -//! -//! Probes that need their own crate dependencies (e.g. a future -//! `HelmReleaseHealthy` that talks to the helm client) can be -//! gated behind a Cargo feature here without touching the core -//! smoke types. - -pub mod tcp; - -pub use tcp::TcpReachable; diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/probes/tcp.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/probes/tcp.rs deleted file mode 100644 index ce1dc7bc..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/probes/tcp.rs +++ /dev/null @@ -1,225 +0,0 @@ -//! `TcpReachable` — the smallest useful probe: open a TCP connection -//! to `host:port` within a timeout. -//! -//! Used as the first stage of practically every fleet smoke suite: -//! "before checking the operator's `/healthz`, prove the cluster IP -//! even routes". A connect-refused is a `Retry` (we may have caught -//! the service mid-startup); a DNS or address-syntax error is -//! `Fatal` (no amount of polling will fix it). -//! -//! Implemented with `tokio::net::TcpStream`. No new crate -//! dependencies — `tokio` is already pulled in with the `full` -//! feature by this crate's `Cargo.toml`. - -use std::io; -use std::time::Duration; - -use async_trait::async_trait; -use tokio::net::TcpStream; - -use harmony::topology::Topology; - -use crate::companion::smoke::probe::{Probe, ProbeAttempt, ProbeName}; - -/// Probe that connects to `host:port` and immediately drops the -/// socket. Pass on `connect succeeded`. -#[derive(Debug, Clone)] -pub struct TcpReachable { - name: ProbeName, - address: String, - connect_timeout: Duration, -} - -impl TcpReachable { - /// Default connect timeout is 1s — short enough that a polling - /// retry sees several attempts; long enough that a slow host - /// doesn't false-fail. Override with - /// [`Self::with_connect_timeout`]. - pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(1); - - pub fn new(name: ProbeName, address: impl Into) -> Self { - Self { - name, - address: address.into(), - connect_timeout: Self::DEFAULT_CONNECT_TIMEOUT, - } - } - - pub fn with_connect_timeout(mut self, t: Duration) -> Self { - self.connect_timeout = t; - self - } - - pub fn address(&self) -> &str { - &self.address - } -} - -#[async_trait] -impl Probe for TcpReachable { - fn name(&self) -> &ProbeName { - &self.name - } - - async fn check(&self, _topology: &T) -> ProbeAttempt { - let connect = TcpStream::connect(self.address.as_str()); - match tokio::time::timeout(self.connect_timeout, connect).await { - Ok(Ok(_stream)) => ProbeAttempt::Ok(Some(format!("connected to {}", self.address))), - Ok(Err(e)) => classify_connect_error(&self.address, e), - Err(_elapsed) => ProbeAttempt::Retry(format!( - "connect to {} timed out after {:?}", - self.address, self.connect_timeout - )), - } - } -} - -/// Most connect errors look like "not ready yet" — we want to keep -/// polling. A small set is definitive and should short-circuit the -/// retry budget. `InvalidInput` and `Unsupported` both mean the -/// address is unparseable / unroutable on this host — no number of -/// retries will fix that. -fn classify_connect_error(address: &str, e: io::Error) -> ProbeAttempt { - match e.kind() { - io::ErrorKind::InvalidInput => { - ProbeAttempt::Fatal(format!("invalid address {address}: {e}")) - } - io::ErrorKind::Unsupported => { - ProbeAttempt::Fatal(format!("unsupported address {address}: {e}")) - } - // ConnectionRefused / ConnectionReset / TimedOut / - // HostUnreachable / NetworkUnreachable / NotFound (rare) — - // all consistent with "service not up yet". Keep polling. - _ => ProbeAttempt::Retry(format!("connect to {address}: {e}")), - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::companion::smoke::probe::{ProbeFailure, ProbeOutcome, RetryPolicy, run_probe}; - use async_trait::async_trait; - use harmony::topology::PreparationOutcome; - use tokio::net::TcpListener; - - #[derive(Debug)] - struct NoopTopology; - - #[async_trait] - impl Topology for NoopTopology { - fn name(&self) -> &str { - "noop" - } - async fn ensure_ready( - &self, - ) -> Result { - Ok(PreparationOutcome::Noop) - } - } - - fn name(s: &str) -> ProbeName { - ProbeName::try_new(s).unwrap() - } - - #[tokio::test] - async fn passes_against_a_bound_listener() { - // Bind to a kernel-assigned port and prove the probe sees - // the listener. Using 127.0.0.1 keeps this hermetic — no - // outbound network, no flakiness from name resolution. - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = listener.local_addr().unwrap(); - let probe = TcpReachable::new(name("local-listener"), addr.to_string()); - match run_probe(&probe, &NoopTopology, RetryPolicy::once()).await { - ProbeOutcome::Pass { attempts, .. } => assert_eq!(attempts, 1), - other => panic!("expected pass, got {other:?}"), - } - } - - #[tokio::test] - async fn retries_then_passes_when_listener_appears_late() { - // Pick a free port, then race: probe runs immediately, then - // we bind 30 ms later. The polling retry should catch it. - let pre = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = pre.local_addr().unwrap(); - drop(pre); // free the port - - let probe = TcpReachable::new(name("late-listener"), addr.to_string()) - .with_connect_timeout(Duration::from_millis(50)); - - let (probe_result, _) = tokio::join!( - run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(10), Duration::from_secs(2)), - ), - async { - tokio::time::sleep(Duration::from_millis(40)).await; - let listener = TcpListener::bind(addr).await.expect("rebind"); - // Hold the listener until the probe is satisfied. - tokio::time::sleep(Duration::from_millis(500)).await; - drop(listener); - } - ); - - match probe_result { - ProbeOutcome::Pass { attempts, .. } => { - assert!( - attempts >= 2, - "expected at least 2 attempts, got {attempts}" - ); - } - other => panic!("expected eventual pass, got {other:?}"), - } - } - - #[tokio::test] - async fn times_out_on_unused_port() { - // Bind+drop to confirm the port is free in the kernel's - // sense, then probe against it with a tight budget. With - // nothing listening, the OS returns ECONNREFUSED, which we - // classify as Retry → budget expiry → Timeout. - let pre = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let addr = pre.local_addr().unwrap(); - drop(pre); - - let probe = TcpReachable::new(name("nothing-here"), addr.to_string()) - .with_connect_timeout(Duration::from_millis(50)); - let outcome = run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(20), Duration::from_millis(100)), - ) - .await; - match outcome { - ProbeOutcome::Fail { - failure: ProbeFailure::Timeout { .. }, - .. - } => {} - other => panic!("expected Timeout, got {other:?}"), - } - } - - #[tokio::test] - async fn fatal_on_unparseable_address() { - // Connect to a syntactically invalid address. No amount of - // retrying will help; we should short-circuit to Fatal. - let probe = TcpReachable::new(name("bad-addr"), "not-an-address"); - let outcome = run_probe( - &probe, - &NoopTopology, - RetryPolicy::polling(Duration::from_millis(20), Duration::from_secs(5)), - ) - .await; - match outcome { - ProbeOutcome::Fail { - failure: ProbeFailure::Rejected { detail }, - attempts, - .. - } => { - assert!(detail.contains("not-an-address")); - assert_eq!(attempts, 1, "Fatal must short-circuit on attempt 1"); - } - other => panic!("expected Rejected, got {other:?}"), - } - } -} diff --git a/fleet/harmony-fleet-deploy/src/companion/smoke/suite.rs b/fleet/harmony-fleet-deploy/src/companion/smoke/suite.rs deleted file mode 100644 index a4de34fa..00000000 --- a/fleet/harmony-fleet-deploy/src/companion/smoke/suite.rs +++ /dev/null @@ -1,287 +0,0 @@ -//! Smoke suite — an ordered sequence of [`Probe`]s with per-probe -//! retry budgets, and the report that comes back from running it. -//! -//! Suites are sequential by design in Phase 0. The mental model is -//! "first prove the basic connectivity, then prove the derived -//! state" — which is inherently ordered. Parallel stages are a -//! deferred extension; the public API never exposed sequentiality -//! as a guarantee, so adding `parallel_stage` later is additive. -//! -//! A [`SmokeReport`] is a pure value — easy to log, easy to -//! serialize for the dashboard, easy to assert against in tests. It -//! deliberately mirrors the -//! [`Outcome`](harmony::interpret::Outcome)/[`InterpretError`](harmony::interpret::InterpretError) -//! split that the rest of the framework uses: one type per "what -//! happened", not a Result soup. - -use std::time::{Duration, Instant}; - -use harmony::topology::Topology; -use tracing::{info, info_span}; - -use super::probe::{Probe, ProbeName, ProbeOutcome, RetryPolicy, run_probe}; - -/// One stage in a [`SmokeSuite`]: a probe plus the retry budget that -/// applies to *this* invocation. -struct SmokeStage { - probe: Box>, - policy: RetryPolicy, -} - -/// An ordered, retry-aware composition of probes for one -/// [`SmokeTest`](super::SmokeTest). -/// -/// Built via [`SmokeSuite::new`] + [`SmokeSuite::stage`]. Run via -/// [`SmokeSuite::run`]. The same probe type can appear multiple -/// times with different policies — that's the point of supplying -/// the policy at the stage level, not on the probe itself. -pub struct SmokeSuite { - stages: Vec>, -} - -impl Default for SmokeSuite { - fn default() -> Self { - Self::new() - } -} - -impl std::fmt::Debug for SmokeSuite { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SmokeSuite") - .field( - "stages", - &self - .stages - .iter() - .map(|s| s.probe.name().as_str()) - .collect::>(), - ) - .finish() - } -} - -impl SmokeSuite { - pub fn new() -> Self { - Self { stages: Vec::new() } - } - - /// Append a stage. Builder-style so an `assemble` method reads - /// like a checklist: - /// - /// ```ignore - /// SmokeSuite::new() - /// .stage(TcpReachable::new(name, "nats:4222"), RetryPolicy::polling(...)) - /// .stage(HttpHealthy::new(name, "/healthz"), RetryPolicy::polling(...)) - /// ``` - pub fn stage

(mut self, probe: P, policy: RetryPolicy) -> Self - where - P: Probe + 'static, - { - self.stages.push(SmokeStage { - probe: Box::new(probe) as Box>, - policy, - }); - self - } - - pub fn len(&self) -> usize { - self.stages.len() - } - - pub fn is_empty(&self) -> bool { - self.stages.is_empty() - } - - /// Drive every stage in declared order. Stops on the first - /// failure — a smoke test is a chain of preconditions, so - /// continuing past a failure rarely produces useful information - /// and risks flooding the dashboard with cascade failures. - pub async fn run(self, topology: &T) -> SmokeReport { - let span = info_span!("smoke_suite", probes = self.stages.len()); - let _guard = span.enter(); - - let start = Instant::now(); - let mut probes = Vec::with_capacity(self.stages.len()); - - for stage in self.stages { - let name = stage.probe.name().clone(); - info!(probe = %name, "smoke stage started"); - let outcome = run_probe(&*stage.probe, topology, stage.policy).await; - let passed = outcome.passed(); - info!( - probe = %name, - passed, - elapsed_ms = elapsed_ms(&outcome), - "smoke stage finished", - ); - probes.push(ProbeReport { name, outcome }); - if !passed { - break; - } - } - - SmokeReport { - probes, - elapsed: start.elapsed(), - } - } -} - -fn elapsed_ms(outcome: &ProbeOutcome) -> u64 { - let d = match outcome { - ProbeOutcome::Pass { elapsed, .. } | ProbeOutcome::Fail { elapsed, .. } => *elapsed, - }; - d.as_millis() as u64 -} - -/// What one probe stage produced. The `name` is repeated here (also -/// owned by the probe itself) so a `SmokeReport` is fully -/// self-contained — the probe `Box` is dropped after the suite runs. -#[derive(Debug, Clone)] -pub struct ProbeReport { - pub name: ProbeName, - pub outcome: ProbeOutcome, -} - -impl ProbeReport { - pub fn passed(&self) -> bool { - self.outcome.passed() - } -} - -/// What [`SmokeSuite::run`] returns. The dashboard reads probes top -/// to bottom; the orchestrator reads `passed()` to gate the deploy. -#[derive(Debug, Clone)] -pub struct SmokeReport { - pub probes: Vec, - pub elapsed: Duration, -} - -impl SmokeReport { - /// `true` iff every probe in the suite passed. - /// - /// An empty suite passes — there's nothing to fail. Callers who - /// want "a smoke test ran" semantics should check `!is_empty()` - /// in addition. - pub fn passed(&self) -> bool { - self.probes.iter().all(ProbeReport::passed) - } - - pub fn is_empty(&self) -> bool { - self.probes.is_empty() - } - - /// Iterator over only the failed probes — convenient for - /// rendering "X failures of Y" without filtering at the call - /// site. - pub fn failed(&self) -> impl Iterator { - self.probes.iter().filter(|p| !p.passed()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::companion::smoke::probe::{ProbeAttempt, ProbeFailure}; - use async_trait::async_trait; - use harmony::topology::PreparationOutcome; - - #[derive(Debug)] - struct NoopTopology; - - #[async_trait] - impl Topology for NoopTopology { - fn name(&self) -> &str { - "noop" - } - async fn ensure_ready( - &self, - ) -> Result { - Ok(PreparationOutcome::Noop) - } - } - - #[derive(Debug)] - struct Always { - name: ProbeName, - attempt: ProbeAttempt, - } - - impl Always { - fn new(name: &str, attempt: ProbeAttempt) -> Self { - Self { - name: ProbeName::try_new(name).unwrap(), - attempt, - } - } - } - - #[async_trait] - impl Probe for Always { - fn name(&self) -> &ProbeName { - &self.name - } - async fn check(&self, _t: &T) -> ProbeAttempt { - self.attempt.clone() - } - } - - #[tokio::test] - async fn empty_suite_passes_vacuously() { - let report = SmokeSuite::::new().run(&NoopTopology).await; - assert!(report.passed()); - assert!(report.is_empty()); - } - - #[tokio::test] - async fn all_pass_stages_aggregates_pass() { - let report = SmokeSuite::::new() - .stage( - Always::new("a", ProbeAttempt::Ok(None)), - RetryPolicy::once(), - ) - .stage( - Always::new("b", ProbeAttempt::Ok(Some("ok".to_string()))), - RetryPolicy::once(), - ) - .run(&NoopTopology) - .await; - assert!(report.passed()); - assert_eq!(report.probes.len(), 2); - assert!(report.failed().count() == 0); - } - - #[tokio::test] - async fn first_failure_short_circuits_remaining_stages() { - // The third stage must never run. We assert by name — - // its presence in the report would prove a regression. - let report = SmokeSuite::::new() - .stage( - Always::new("a", ProbeAttempt::Ok(None)), - RetryPolicy::once(), - ) - .stage( - Always::new("b", ProbeAttempt::Fatal("nope".to_string())), - RetryPolicy::once(), - ) - .stage( - Always::new("c", ProbeAttempt::Ok(None)), - RetryPolicy::once(), - ) - .run(&NoopTopology) - .await; - assert!(!report.passed()); - assert_eq!(report.probes.len(), 2); - assert_eq!(report.probes[0].name.as_str(), "a"); - assert_eq!(report.probes[1].name.as_str(), "b"); - // Cascading failures are noise — confirm only the actual - // culprit appears. - match &report.probes[1].outcome { - ProbeOutcome::Fail { - failure: ProbeFailure::Rejected { detail }, - .. - } => assert_eq!(detail, "nope"), - other => panic!("expected Rejected, got {other:?}"), - } - } -}