Customer apps frequently need a one-shot setup step (DB migration,
config render, cache warm-up) to succeed before the long-running
service starts. Without init containers each customer either inlines
the step into the service entrypoint (slow, racy, no failure surface)
or bolts on a sidecar that the platform can't introspect. This change
adds k8s-style init containers at the score layer so the contract is
the same one the customer already knows.
Score:
- New `InitContainer { name, image, args, env, volumes, timeout }`
in `harmony::modules::podman`.
- `PodmanV0Score.init_containers: Vec<InitContainer>` with
`#[serde(default)]` — pre-init-container wire payloads parse as an
empty vec and behave unchanged.
- `DEFAULT_INIT_CONTAINER_TIMEOUT = 300s`; timeout serializes as
whole seconds for operator readability.
- Idempotency is the customer's contract — documented at module
level: init containers re-run on every reconcile that needs a
fresh main container set.
Runtime contract:
- `ContainerRuntime::run_to_completion(spec, timeout) -> RunOutcome`
added to the domain trait. `RunOutcome::Exited { exit_code }`
vs `TimedOut { waited }` — distinct arms because the caller's
failure path is different (operator gets the exit code for
actionable diagnosis).
- Init containers are NOT surfaced via `list_managed_services`;
they're removed after they exit so the host's managed-container
surface stays bounded to long-running services.
PodmanTopology implementation:
- Pre-remove any prior container with the same name (retry-safe).
- Restart policy forced to `No` — a retrying init defeats the
run-to-completion contract.
- `tokio::time::timeout` around `podman wait`; force-remove + return
`TimedOut` on deadline.
- Single 200ms retry on inspect for the libpod race where state can
briefly read `running` between `wait` returning and conmon writing
the exit code.
- `INIT_CONTAINER_LABEL` on every init container so operators can
`podman ps -a --filter label=...` to spot init failures.
Interpret:
- Init containers run sequentially before any service. Non-zero exit
or timeout fails the deployment with a typed `InterpretError`
carrying the container name + cause.
- Success message reports both counts.
Tests (in tree):
- 3 new wire-format tests in `podman::score`: roundtrip, default
timeout hydration, ordering preservation.
- All 10 existing podman::score tests still pass; legacy roundtrip
test now also asserts `init_containers.is_empty()` as a wire-compat
canary.
Call-site updates (5 sites) — all existing constructors of
`PodmanV0Score` add `init_containers: vec![]`: harmony_apply_deployment
example, fleet_load_test example, operator e2e, vm_deploy_lifecycle
e2e, vm_isolation e2e.
Deferred: per-version "run-once" semantics (customer can build with a
marker file today); the agent-side handler for surfacing init logs to
the operator dashboard (covered by the logs companion PR's deferred
work).
215 lines
6.9 KiB
Rust
215 lines
6.9 KiB
Rust
use harmony::modules::fleet::operator::crd::{
|
|
Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore,
|
|
Rollout, RolloutStrategy,
|
|
};
|
|
use harmony_fleet_e2e::{StackOptions, shared_stack};
|
|
use harmony_reconciler_contracts::{BUCKET_DESIRED_STATE, DeploymentName, desired_state_key};
|
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
|
use kube::Client;
|
|
use kube::api::{Api, DeleteParams, ObjectMeta, PostParams};
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant};
|
|
|
|
const E2E_ENV: &str = "HARMONY_FLEET_E2E";
|
|
|
|
fn e2e_enabled() -> bool {
|
|
matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true"))
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn operator_adds_finalizer_to_fleet_deployment() -> anyhow::Result<()> {
|
|
if !e2e_enabled() {
|
|
skip_e2e();
|
|
return Ok(());
|
|
}
|
|
|
|
let stack = operator_stack().await?;
|
|
let deployments = fleet_deployments(&stack.namespace).await?;
|
|
|
|
create_fleet_deployment(&deployments, "finalizer-test").await?;
|
|
wait_for_finalizer(&deployments, "finalizer-test").await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn operator_writes_desired_state_for_matching_device() -> anyhow::Result<()> {
|
|
if !e2e_enabled() {
|
|
skip_e2e();
|
|
return Ok(());
|
|
}
|
|
|
|
let stack = operator_stack().await?;
|
|
|
|
let client = Client::try_default().await?;
|
|
// Device CRs are cluster-scoped; Deployment CRs are namespaced.
|
|
let devices: Api<Device> = Api::all(client.clone());
|
|
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
|
|
|
create_device(&devices, "desired-state-device").await?;
|
|
create_fleet_deployment(&deployments, "desired-state-test").await?;
|
|
wait_for_desired_state_entry(&stack, "desired-state-device", "desired-state-test", true)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn operator_deletes_desired_state_when_deployment_is_deleted() -> anyhow::Result<()> {
|
|
if !e2e_enabled() {
|
|
skip_e2e();
|
|
return Ok(());
|
|
}
|
|
|
|
let stack = operator_stack().await?;
|
|
|
|
let client = Client::try_default().await?;
|
|
let devices: Api<Device> = Api::all(client.clone());
|
|
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
|
|
|
create_device(&devices, "cleanup-device").await?;
|
|
create_fleet_deployment(&deployments, "cleanup-test").await?;
|
|
wait_for_finalizer(&deployments, "cleanup-test").await?;
|
|
wait_for_desired_state_entry(&stack, "cleanup-device", "cleanup-test", true).await?;
|
|
|
|
deployments
|
|
.delete("cleanup-test", &DeleteParams::default())
|
|
.await?;
|
|
|
|
wait_for_desired_state_entry(&stack, "cleanup-device", "cleanup-test", false).await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn operator_stack() -> anyhow::Result<Arc<harmony_fleet_e2e::Stack>> {
|
|
let _ = tracing_subscriber::fmt()
|
|
.with_env_filter(
|
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
|
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
|
|
)
|
|
.try_init();
|
|
|
|
let stack = shared_stack(StackOptions {
|
|
deploy_agent: false,
|
|
deploy_operator: true,
|
|
..StackOptions::default()
|
|
})
|
|
.await?;
|
|
|
|
stack.print_debug_info();
|
|
assert!(stack.device_ids.is_empty());
|
|
|
|
Ok(stack)
|
|
}
|
|
|
|
fn skip_e2e() {
|
|
eprintln!(
|
|
"skipping {E2E_ENV}-gated e2e test (set {E2E_ENV}=1 to run; \
|
|
requires k3d + podman on PATH)"
|
|
);
|
|
}
|
|
|
|
async fn fleet_deployments(namespace: &str) -> anyhow::Result<Api<Deployment>> {
|
|
let client = Client::try_default().await?;
|
|
Ok(Api::namespaced(client, namespace))
|
|
}
|
|
|
|
async fn create_device(devices: &Api<Device>, name: &str) -> anyhow::Result<()> {
|
|
let device = Device::new(name, DeviceSpec { inventory: None });
|
|
devices.create(&PostParams::default(), &device).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn create_fleet_deployment(deployments: &Api<Deployment>, name: &str) -> anyhow::Result<()> {
|
|
let deployment = Deployment {
|
|
metadata: ObjectMeta {
|
|
name: Some(name.to_string()),
|
|
..Default::default()
|
|
},
|
|
spec: DeploymentSpec {
|
|
target_selector: LabelSelector::default(),
|
|
score: ReconcileScore::PodmanV0(PodmanV0Score {
|
|
services: vec![PodmanService {
|
|
name: "hello".to_string(),
|
|
image: "docker.io/library/hello-world:latest".to_string(),
|
|
ports: vec![],
|
|
env: vec![],
|
|
volumes: vec![],
|
|
restart_policy: Default::default(),
|
|
}],
|
|
init_containers: vec![],
|
|
}),
|
|
rollout: Rollout {
|
|
strategy: RolloutStrategy::Immediate,
|
|
},
|
|
},
|
|
status: None,
|
|
};
|
|
|
|
deployments
|
|
.create(&PostParams::default(), &deployment)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn wait_for_finalizer(deployments: &Api<Deployment>, name: &str) -> anyhow::Result<()> {
|
|
let deadline = Instant::now() + Duration::from_secs(30);
|
|
loop {
|
|
let created = deployments.get(name).await?;
|
|
let finalizers = created.metadata.finalizers.unwrap_or_default();
|
|
if finalizers
|
|
.iter()
|
|
.any(|finalizer| finalizer == "fleet.nationtech.io/finalizer")
|
|
{
|
|
break;
|
|
}
|
|
if Instant::now() >= deadline {
|
|
anyhow::bail!(
|
|
"timed out waiting for fleet finalizer; current finalizers: {:?}",
|
|
finalizers
|
|
);
|
|
}
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn wait_for_desired_state_entry(
|
|
stack: &harmony_fleet_e2e::Stack,
|
|
device_id: &str,
|
|
deployment: &str,
|
|
expect_present: bool,
|
|
) -> anyhow::Result<()> {
|
|
let nats_client = connect_admin_nats(stack).await?;
|
|
let js = async_nats::jetstream::new(nats_client);
|
|
let desired_state = js.get_key_value(BUCKET_DESIRED_STATE).await?;
|
|
let deployment_name = DeploymentName::try_new(deployment)?;
|
|
let desired_state_key = desired_state_key(device_id, &deployment_name);
|
|
|
|
let deadline = Instant::now() + Duration::from_secs(30);
|
|
loop {
|
|
let exists = desired_state.get(&desired_state_key).await?.is_some();
|
|
if exists == expect_present {
|
|
break;
|
|
}
|
|
if Instant::now() >= deadline {
|
|
let state = if expect_present { "present" } else { "deleted" };
|
|
anyhow::bail!(
|
|
"timed out waiting for desired-state key {desired_state_key} to be {state}"
|
|
);
|
|
}
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn connect_admin_nats(
|
|
stack: &harmony_fleet_e2e::Stack,
|
|
) -> anyhow::Result<async_nats::Client> {
|
|
async_nats::ConnectOptions::new()
|
|
.user_and_password(stack.admin_user.clone(), stack.admin_pass.clone())
|
|
.connect(&stack.nats_url)
|
|
.await
|
|
.map_err(Into::into)
|
|
}
|