Files
harmony/harmony-k8s/src/node.rs
Jean-Gabriel Gill-Couture 787cc8feab
All checks were successful
Run Check Script / check (pull_request) Successful in 2m6s
Fix doc tests for harmony-k8s crate refactoring
- Updated harmony-k8s doc tests to import from harmony_k8s instead of harmony
- Changed CloudNativePgOperatorScore::default() to default_openshift()

This ensures doc tests work correctly after moving K8sClient to the harmony-k8s crate.
2026-03-07 15:50:39 -05:00

723 lines
25 KiB
Rust

use std::collections::BTreeMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use k8s_openapi::api::core::v1::{
ConfigMap, ConfigMapVolumeSource, Node, Pod, Volume, VolumeMount,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::{
Error,
api::{Api, DeleteParams, EvictParams, ListParams, PostParams},
core::ErrorResponse,
error::DiscoveryError,
};
use log::{debug, error, info, warn};
use tokio::time::sleep;
use crate::client::K8sClient;
use crate::helper::{self, PrivilegedPodConfig};
use crate::types::{DrainOptions, NodeFile};
impl K8sClient {
pub async fn cordon_node(&self, node_name: &str) -> Result<(), Error> {
Api::<Node>::all(self.client.clone())
.cordon(node_name)
.await?;
Ok(())
}
pub async fn uncordon_node(&self, node_name: &str) -> Result<(), Error> {
Api::<Node>::all(self.client.clone())
.uncordon(node_name)
.await?;
Ok(())
}
pub async fn wait_for_node_ready(&self, node_name: &str) -> Result<(), Error> {
self.wait_for_node_ready_with_timeout(node_name, Duration::from_secs(600))
.await
}
async fn wait_for_node_ready_with_timeout(
&self,
node_name: &str,
timeout: Duration,
) -> Result<(), Error> {
let api: Api<Node> = Api::all(self.client.clone());
let start = tokio::time::Instant::now();
let poll = Duration::from_secs(5);
loop {
if start.elapsed() > timeout {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' did not become Ready within {timeout:?}"
))));
}
match api.get(node_name).await {
Ok(node) => {
if node
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.map(|conds| {
conds
.iter()
.any(|c| c.type_ == "Ready" && c.status == "True")
})
.unwrap_or(false)
{
debug!("Node '{node_name}' is Ready");
return Ok(());
}
}
Err(e) => debug!("Error polling node '{node_name}': {e}"),
}
sleep(poll).await;
}
}
async fn wait_for_node_not_ready(
&self,
node_name: &str,
timeout: Duration,
) -> Result<(), Error> {
let api: Api<Node> = Api::all(self.client.clone());
let start = tokio::time::Instant::now();
let poll = Duration::from_secs(5);
loop {
if start.elapsed() > timeout {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' did not become NotReady within {timeout:?}"
))));
}
match api.get(node_name).await {
Ok(node) => {
let is_ready = node
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.map(|conds| {
conds
.iter()
.any(|c| c.type_ == "Ready" && c.status == "True")
})
.unwrap_or(false);
if !is_ready {
debug!("Node '{node_name}' is NotReady");
return Ok(());
}
}
Err(e) => debug!("Error polling node '{node_name}': {e}"),
}
sleep(poll).await;
}
}
async fn list_pods_on_node(&self, node_name: &str) -> Result<Vec<Pod>, Error> {
let api: Api<Pod> = Api::all(self.client.clone());
Ok(api
.list(&ListParams::default().fields(&format!("spec.nodeName={node_name}")))
.await?
.items)
}
fn is_mirror_pod(pod: &Pod) -> bool {
pod.metadata
.annotations
.as_ref()
.map(|a| a.contains_key("kubernetes.io/config.mirror"))
.unwrap_or(false)
}
fn is_daemonset_pod(pod: &Pod) -> bool {
pod.metadata
.owner_references
.as_ref()
.map(|refs| refs.iter().any(|r| r.kind == "DaemonSet"))
.unwrap_or(false)
}
fn has_emptydir_volume(pod: &Pod) -> bool {
pod.spec
.as_ref()
.and_then(|s| s.volumes.as_ref())
.map(|vols| vols.iter().any(|v| v.empty_dir.is_some()))
.unwrap_or(false)
}
fn is_completed_pod(pod: &Pod) -> bool {
pod.status
.as_ref()
.and_then(|s| s.phase.as_deref())
.map(|phase| phase == "Succeeded" || phase == "Failed")
.unwrap_or(false)
}
fn classify_pods_for_drain(
pods: &[Pod],
options: &DrainOptions,
) -> Result<(Vec<Pod>, Vec<String>), String> {
let mut evictable = Vec::new();
let mut skipped = Vec::new();
let mut blocking = Vec::new();
for pod in pods {
let name = pod.metadata.name.as_deref().unwrap_or("<unknown>");
let ns = pod.metadata.namespace.as_deref().unwrap_or("<unknown>");
let qualified = format!("{ns}/{name}");
if Self::is_mirror_pod(pod) {
skipped.push(format!("{qualified} (mirror pod)"));
continue;
}
if Self::is_completed_pod(pod) {
skipped.push(format!("{qualified} (completed)"));
continue;
}
if Self::is_daemonset_pod(pod) {
if options.ignore_daemonsets {
skipped.push(format!("{qualified} (DaemonSet-managed)"));
} else {
blocking.push(format!(
"{qualified} is managed by a DaemonSet (set ignore_daemonsets to skip)"
));
}
continue;
}
if Self::has_emptydir_volume(pod) && !options.delete_emptydir_data {
blocking.push(format!(
"{qualified} uses emptyDir volumes (set delete_emptydir_data to allow eviction)"
));
continue;
}
evictable.push(pod.clone());
}
if !blocking.is_empty() {
return Err(format!(
"Cannot drain node — the following pods block eviction:\n - {}",
blocking.join("\n - ")
));
}
Ok((evictable, skipped))
}
async fn evict_pod(&self, pod: &Pod) -> Result<(), Error> {
let name = pod.metadata.name.as_deref().unwrap_or_default();
let ns = pod.metadata.namespace.as_deref().unwrap_or_default();
debug!("Evicting pod {ns}/{name}");
Api::<Pod>::namespaced(self.client.clone(), ns)
.evict(name, &EvictParams::default())
.await
.map(|_| ())
}
/// Drains a node: cordon → classify → evict & wait.
pub async fn drain_node(&self, node_name: &str, options: &DrainOptions) -> Result<(), Error> {
debug!("Cordoning '{node_name}'");
self.cordon_node(node_name).await?;
let pods = self.list_pods_on_node(node_name).await?;
debug!("Found {} pod(s) on '{node_name}'", pods.len());
let (evictable, skipped) =
Self::classify_pods_for_drain(&pods, options).map_err(|msg| {
error!("{msg}");
Error::Discovery(DiscoveryError::MissingResource(msg))
})?;
for s in &skipped {
info!("Skipping pod: {s}");
}
if evictable.is_empty() {
info!("No pods to evict on '{node_name}'");
return Ok(());
}
info!("Evicting {} pod(s) from '{node_name}'", evictable.len());
let mut start = tokio::time::Instant::now();
let poll = Duration::from_secs(5);
let mut pending = evictable;
loop {
for pod in &pending {
match self.evict_pod(pod).await {
Ok(()) => {}
Err(Error::Api(ErrorResponse { code: 404, .. })) => {}
Err(Error::Api(ErrorResponse { code: 429, .. })) => {
warn!(
"PDB blocked eviction of {}/{}; will retry",
pod.metadata.namespace.as_deref().unwrap_or(""),
pod.metadata.name.as_deref().unwrap_or("")
);
}
Err(e) => {
error!(
"Failed to evict {}/{}: {e}",
pod.metadata.namespace.as_deref().unwrap_or(""),
pod.metadata.name.as_deref().unwrap_or("")
);
return Err(e);
}
}
}
sleep(poll).await;
let mut still_present = Vec::new();
for pod in pending {
let ns = pod.metadata.namespace.as_deref().unwrap_or_default();
let name = pod.metadata.name.as_deref().unwrap_or_default();
match self.get_pod(name, Some(ns)).await? {
Some(_) => still_present.push(pod),
None => debug!("Pod {ns}/{name} evicted"),
}
}
pending = still_present;
if pending.is_empty() {
break;
}
if start.elapsed() > options.timeout {
match helper::prompt_drain_timeout_action(
node_name,
pending.len(),
options.timeout,
)? {
helper::DrainTimeoutAction::Accept => break,
helper::DrainTimeoutAction::Retry => {
start = tokio::time::Instant::now();
continue;
}
helper::DrainTimeoutAction::Abort => {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Drain aborted. {} pod(s) remaining on '{node_name}'",
pending.len()
))));
}
}
}
debug!("Waiting for {} pod(s) on '{node_name}'", pending.len());
}
debug!("'{node_name}' drained successfully");
Ok(())
}
/// Safely reboots a node: drain → reboot → wait for Ready → uncordon.
pub async fn reboot_node(
&self,
node_name: &str,
drain_options: &DrainOptions,
timeout: Duration,
) -> Result<(), Error> {
info!("Starting reboot for '{node_name}'");
let node_api: Api<Node> = Api::all(self.client.clone());
let boot_id_before = node_api
.get(node_name)
.await?
.status
.as_ref()
.and_then(|s| s.node_info.as_ref())
.map(|ni| ni.boot_id.clone())
.ok_or_else(|| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' has no boot_id in status"
)))
})?;
info!("Draining '{node_name}'");
self.drain_node(node_name, drain_options).await?;
let start = tokio::time::Instant::now();
info!("Scheduling reboot for '{node_name}'");
let reboot_cmd =
"echo rebooting ; nohup bash -c 'sleep 5 && nsenter -t 1 -m -- systemctl reboot'";
match self
.run_privileged_command_on_node(node_name, reboot_cmd)
.await
{
Ok(_) => debug!("Reboot command dispatched"),
Err(e) => debug!("Reboot command error (expected if node began shutdown): {e}"),
}
info!("Waiting for '{node_name}' to begin shutdown");
self.wait_for_node_not_ready(node_name, timeout.saturating_sub(start.elapsed()))
.await?;
if start.elapsed() > timeout {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Timeout during reboot of '{node_name}' (shutdown phase)"
))));
}
info!("Waiting for '{node_name}' to come back online");
self.wait_for_node_ready_with_timeout(node_name, timeout.saturating_sub(start.elapsed()))
.await?;
if start.elapsed() > timeout {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Timeout during reboot of '{node_name}' (ready phase)"
))));
}
let boot_id_after = node_api
.get(node_name)
.await?
.status
.as_ref()
.and_then(|s| s.node_info.as_ref())
.map(|ni| ni.boot_id.clone())
.ok_or_else(|| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' has no boot_id after reboot"
)))
})?;
if boot_id_before == boot_id_after {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' did not actually reboot (boot_id unchanged: {boot_id_before})"
))));
}
info!("'{node_name}' rebooted ({boot_id_before} → {boot_id_after})");
self.uncordon_node(node_name).await?;
info!("'{node_name}' reboot complete ({:?})", start.elapsed());
Ok(())
}
/// Write a set of files to a node's filesystem via a privileged ephemeral pod.
pub async fn write_files_to_node(
&self,
node_name: &str,
files: &[NodeFile],
) -> Result<String, Error> {
let ns = self.client.default_namespace();
let suffix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
let name = format!("harmony-k8s-writer-{suffix}");
debug!("Writing {} file(s) to '{node_name}'", files.len());
let mut data = BTreeMap::new();
let mut script = String::from("set -e\n");
for (i, file) in files.iter().enumerate() {
let key = format!("f{i}");
data.insert(key.clone(), file.content.clone());
script.push_str(&format!("mkdir -p \"$(dirname \"/host{}\")\"\n", file.path));
script.push_str(&format!("cp \"/payload/{key}\" \"/host{}\"\n", file.path));
script.push_str(&format!("chmod {:o} \"/host{}\"\n", file.mode, file.path));
}
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some(name.clone()),
namespace: Some(ns.to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
let cm_api: Api<ConfigMap> = Api::namespaced(self.client.clone(), ns);
cm_api.create(&PostParams::default(), &cm).await?;
debug!("Created ConfigMap '{name}'");
let (host_vol, host_mount) = helper::host_root_volume();
let payload_vol = Volume {
name: "payload".to_string(),
config_map: Some(ConfigMapVolumeSource {
name: name.clone(),
..Default::default()
}),
..Default::default()
};
let payload_mount = VolumeMount {
name: "payload".to_string(),
mount_path: "/payload".to_string(),
..Default::default()
};
let bundle = helper::build_privileged_bundle(
PrivilegedPodConfig {
name: name.clone(),
namespace: ns.to_string(),
node_name: node_name.to_string(),
container_name: "writer".to_string(),
command: vec!["/bin/bash".to_string(), "-c".to_string(), script],
volumes: vec![payload_vol, host_vol],
volume_mounts: vec![payload_mount, host_mount],
host_pid: false,
host_network: false,
},
&self.get_k8s_distribution().await?,
);
bundle.apply(self).await?;
debug!("Created privileged pod bundle '{name}'");
let result = self.wait_for_pod_completion(&name, ns).await;
debug!("Cleaning up '{name}'");
let _ = bundle.delete(self).await;
let _ = cm_api.delete(&name, &DeleteParams::default()).await;
result
}
/// Run a privileged command on a node via an ephemeral pod.
pub async fn run_privileged_command_on_node(
&self,
node_name: &str,
command: &str,
) -> Result<String, Error> {
let namespace = self.client.default_namespace();
let suffix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
let name = format!("harmony-k8s-cmd-{suffix}");
debug!("Running privileged command on '{node_name}': {command}");
let (host_vol, host_mount) = helper::host_root_volume();
let bundle = helper::build_privileged_bundle(
PrivilegedPodConfig {
name: name.clone(),
namespace: namespace.to_string(),
node_name: node_name.to_string(),
container_name: "runner".to_string(),
command: vec![
"/bin/bash".to_string(),
"-c".to_string(),
command.to_string(),
],
volumes: vec![host_vol],
volume_mounts: vec![host_mount],
host_pid: true,
host_network: true,
},
&self.get_k8s_distribution().await?,
);
bundle.apply(self).await?;
debug!("Privileged pod '{name}' created");
let result = self.wait_for_pod_completion(&name, namespace).await;
debug!("Cleaning up '{name}'");
let _ = bundle.delete(self).await;
result
}
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use k8s_openapi::api::core::v1::{EmptyDirVolumeSource, PodSpec, PodStatus, Volume};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference};
use super::*;
fn base_pod(name: &str, ns: &str) -> Pod {
Pod {
metadata: ObjectMeta {
name: Some(name.to_string()),
namespace: Some(ns.to_string()),
..Default::default()
},
spec: Some(PodSpec::default()),
status: Some(PodStatus {
phase: Some("Running".to_string()),
..Default::default()
}),
}
}
fn mirror_pod(name: &str, ns: &str) -> Pod {
let mut pod = base_pod(name, ns);
pod.metadata.annotations = Some(std::collections::BTreeMap::from([(
"kubernetes.io/config.mirror".to_string(),
"abc123".to_string(),
)]));
pod
}
fn daemonset_pod(name: &str, ns: &str) -> Pod {
let mut pod = base_pod(name, ns);
pod.metadata.owner_references = Some(vec![OwnerReference {
api_version: "apps/v1".to_string(),
kind: "DaemonSet".to_string(),
name: "some-ds".to_string(),
uid: "uid-ds".to_string(),
..Default::default()
}]);
pod
}
fn emptydir_pod(name: &str, ns: &str) -> Pod {
let mut pod = base_pod(name, ns);
pod.spec = Some(PodSpec {
volumes: Some(vec![Volume {
name: "scratch".to_string(),
empty_dir: Some(EmptyDirVolumeSource::default()),
..Default::default()
}]),
..Default::default()
});
pod
}
fn completed_pod(name: &str, ns: &str, phase: &str) -> Pod {
let mut pod = base_pod(name, ns);
pod.status = Some(PodStatus {
phase: Some(phase.to_string()),
..Default::default()
});
pod
}
fn default_opts() -> DrainOptions {
DrainOptions::default()
}
// All test bodies are identical to the original — only the module path changed.
#[test]
fn empty_pod_list_returns_empty_vecs() {
let (e, s) = K8sClient::classify_pods_for_drain(&[], &default_opts()).unwrap();
assert!(e.is_empty());
assert!(s.is_empty());
}
#[test]
fn normal_pod_is_evictable() {
let pods = vec![base_pod("web", "default")];
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
assert_eq!(e.len(), 1);
assert!(s.is_empty());
}
#[test]
fn mirror_pod_is_skipped() {
let pods = vec![mirror_pod("kube-apiserver", "kube-system")];
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
assert!(e.is_empty());
assert!(s[0].contains("mirror pod"));
}
#[test]
fn completed_pods_are_skipped() {
for phase in ["Succeeded", "Failed"] {
let pods = vec![completed_pod("job", "batch", phase)];
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
assert!(e.is_empty());
assert!(s[0].contains("completed"));
}
}
#[test]
fn daemonset_skipped_when_ignored() {
let pods = vec![daemonset_pod("fluentd", "logging")];
let opts = DrainOptions {
ignore_daemonsets: true,
..default_opts()
};
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap();
assert!(e.is_empty());
assert!(s[0].contains("DaemonSet-managed"));
}
#[test]
fn daemonset_blocks_when_not_ignored() {
let pods = vec![daemonset_pod("fluentd", "logging")];
let opts = DrainOptions {
ignore_daemonsets: false,
..default_opts()
};
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
assert!(err.contains("DaemonSet") && err.contains("logging/fluentd"));
}
#[test]
fn emptydir_blocks_without_flag() {
let pods = vec![emptydir_pod("cache", "default")];
let opts = DrainOptions {
delete_emptydir_data: false,
..default_opts()
};
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
assert!(err.contains("emptyDir") && err.contains("default/cache"));
}
#[test]
fn emptydir_evictable_with_flag() {
let pods = vec![emptydir_pod("cache", "default")];
let opts = DrainOptions {
delete_emptydir_data: true,
..default_opts()
};
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap();
assert_eq!(e.len(), 1);
assert!(s.is_empty());
}
#[test]
fn multiple_blocking_all_reported() {
let pods = vec![daemonset_pod("ds", "ns1"), emptydir_pod("ed", "ns2")];
let opts = DrainOptions {
ignore_daemonsets: false,
delete_emptydir_data: false,
..default_opts()
};
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
assert!(err.contains("ns1/ds") && err.contains("ns2/ed"));
}
#[test]
fn mixed_pods_classified_correctly() {
let pods = vec![
base_pod("web", "default"),
mirror_pod("kube-apiserver", "kube-system"),
daemonset_pod("fluentd", "logging"),
completed_pod("job", "batch", "Succeeded"),
base_pod("api", "default"),
];
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
let names: Vec<&str> = e
.iter()
.map(|p| p.metadata.name.as_deref().unwrap())
.collect();
assert_eq!(names, vec!["web", "api"]);
assert_eq!(s.len(), 3);
}
#[test]
fn mirror_checked_before_completed() {
let mut pod = mirror_pod("static-etcd", "kube-system");
pod.status = Some(PodStatus {
phase: Some("Succeeded".to_string()),
..Default::default()
});
let (_, s) = K8sClient::classify_pods_for_drain(&[pod], &default_opts()).unwrap();
assert!(s[0].contains("mirror pod"), "got: {}", s[0]);
}
#[test]
fn completed_checked_before_daemonset() {
let mut pod = daemonset_pod("collector", "monitoring");
pod.status = Some(PodStatus {
phase: Some("Failed".to_string()),
..Default::default()
});
let (_, s) = K8sClient::classify_pods_for_drain(&[pod], &default_opts()).unwrap();
assert!(s[0].contains("completed"), "got: {}", s[0]);
}
}