All checks were successful
Run Check Script / check (pull_request) Successful in 2m6s
- Updated harmony-k8s doc tests to import from harmony_k8s instead of harmony - Changed CloudNativePgOperatorScore::default() to default_openshift() This ensures doc tests work correctly after moving K8sClient to the harmony-k8s crate.
723 lines
25 KiB
Rust
723 lines
25 KiB
Rust
use std::collections::BTreeMap;
|
|
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
|
|
|
use k8s_openapi::api::core::v1::{
|
|
ConfigMap, ConfigMapVolumeSource, Node, Pod, Volume, VolumeMount,
|
|
};
|
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
|
use kube::{
|
|
Error,
|
|
api::{Api, DeleteParams, EvictParams, ListParams, PostParams},
|
|
core::ErrorResponse,
|
|
error::DiscoveryError,
|
|
};
|
|
use log::{debug, error, info, warn};
|
|
use tokio::time::sleep;
|
|
|
|
use crate::client::K8sClient;
|
|
use crate::helper::{self, PrivilegedPodConfig};
|
|
use crate::types::{DrainOptions, NodeFile};
|
|
|
|
impl K8sClient {
|
|
pub async fn cordon_node(&self, node_name: &str) -> Result<(), Error> {
|
|
Api::<Node>::all(self.client.clone())
|
|
.cordon(node_name)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn uncordon_node(&self, node_name: &str) -> Result<(), Error> {
|
|
Api::<Node>::all(self.client.clone())
|
|
.uncordon(node_name)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn wait_for_node_ready(&self, node_name: &str) -> Result<(), Error> {
|
|
self.wait_for_node_ready_with_timeout(node_name, Duration::from_secs(600))
|
|
.await
|
|
}
|
|
|
|
async fn wait_for_node_ready_with_timeout(
|
|
&self,
|
|
node_name: &str,
|
|
timeout: Duration,
|
|
) -> Result<(), Error> {
|
|
let api: Api<Node> = Api::all(self.client.clone());
|
|
let start = tokio::time::Instant::now();
|
|
let poll = Duration::from_secs(5);
|
|
loop {
|
|
if start.elapsed() > timeout {
|
|
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
|
"Node '{node_name}' did not become Ready within {timeout:?}"
|
|
))));
|
|
}
|
|
match api.get(node_name).await {
|
|
Ok(node) => {
|
|
if node
|
|
.status
|
|
.as_ref()
|
|
.and_then(|s| s.conditions.as_ref())
|
|
.map(|conds| {
|
|
conds
|
|
.iter()
|
|
.any(|c| c.type_ == "Ready" && c.status == "True")
|
|
})
|
|
.unwrap_or(false)
|
|
{
|
|
debug!("Node '{node_name}' is Ready");
|
|
return Ok(());
|
|
}
|
|
}
|
|
Err(e) => debug!("Error polling node '{node_name}': {e}"),
|
|
}
|
|
sleep(poll).await;
|
|
}
|
|
}
|
|
|
|
async fn wait_for_node_not_ready(
|
|
&self,
|
|
node_name: &str,
|
|
timeout: Duration,
|
|
) -> Result<(), Error> {
|
|
let api: Api<Node> = Api::all(self.client.clone());
|
|
let start = tokio::time::Instant::now();
|
|
let poll = Duration::from_secs(5);
|
|
loop {
|
|
if start.elapsed() > timeout {
|
|
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
|
"Node '{node_name}' did not become NotReady within {timeout:?}"
|
|
))));
|
|
}
|
|
match api.get(node_name).await {
|
|
Ok(node) => {
|
|
let is_ready = node
|
|
.status
|
|
.as_ref()
|
|
.and_then(|s| s.conditions.as_ref())
|
|
.map(|conds| {
|
|
conds
|
|
.iter()
|
|
.any(|c| c.type_ == "Ready" && c.status == "True")
|
|
})
|
|
.unwrap_or(false);
|
|
if !is_ready {
|
|
debug!("Node '{node_name}' is NotReady");
|
|
return Ok(());
|
|
}
|
|
}
|
|
Err(e) => debug!("Error polling node '{node_name}': {e}"),
|
|
}
|
|
sleep(poll).await;
|
|
}
|
|
}
|
|
|
|
async fn list_pods_on_node(&self, node_name: &str) -> Result<Vec<Pod>, Error> {
|
|
let api: Api<Pod> = Api::all(self.client.clone());
|
|
Ok(api
|
|
.list(&ListParams::default().fields(&format!("spec.nodeName={node_name}")))
|
|
.await?
|
|
.items)
|
|
}
|
|
|
|
fn is_mirror_pod(pod: &Pod) -> bool {
|
|
pod.metadata
|
|
.annotations
|
|
.as_ref()
|
|
.map(|a| a.contains_key("kubernetes.io/config.mirror"))
|
|
.unwrap_or(false)
|
|
}
|
|
|
|
fn is_daemonset_pod(pod: &Pod) -> bool {
|
|
pod.metadata
|
|
.owner_references
|
|
.as_ref()
|
|
.map(|refs| refs.iter().any(|r| r.kind == "DaemonSet"))
|
|
.unwrap_or(false)
|
|
}
|
|
|
|
fn has_emptydir_volume(pod: &Pod) -> bool {
|
|
pod.spec
|
|
.as_ref()
|
|
.and_then(|s| s.volumes.as_ref())
|
|
.map(|vols| vols.iter().any(|v| v.empty_dir.is_some()))
|
|
.unwrap_or(false)
|
|
}
|
|
|
|
fn is_completed_pod(pod: &Pod) -> bool {
|
|
pod.status
|
|
.as_ref()
|
|
.and_then(|s| s.phase.as_deref())
|
|
.map(|phase| phase == "Succeeded" || phase == "Failed")
|
|
.unwrap_or(false)
|
|
}
|
|
|
|
fn classify_pods_for_drain(
|
|
pods: &[Pod],
|
|
options: &DrainOptions,
|
|
) -> Result<(Vec<Pod>, Vec<String>), String> {
|
|
let mut evictable = Vec::new();
|
|
let mut skipped = Vec::new();
|
|
let mut blocking = Vec::new();
|
|
|
|
for pod in pods {
|
|
let name = pod.metadata.name.as_deref().unwrap_or("<unknown>");
|
|
let ns = pod.metadata.namespace.as_deref().unwrap_or("<unknown>");
|
|
let qualified = format!("{ns}/{name}");
|
|
|
|
if Self::is_mirror_pod(pod) {
|
|
skipped.push(format!("{qualified} (mirror pod)"));
|
|
continue;
|
|
}
|
|
if Self::is_completed_pod(pod) {
|
|
skipped.push(format!("{qualified} (completed)"));
|
|
continue;
|
|
}
|
|
if Self::is_daemonset_pod(pod) {
|
|
if options.ignore_daemonsets {
|
|
skipped.push(format!("{qualified} (DaemonSet-managed)"));
|
|
} else {
|
|
blocking.push(format!(
|
|
"{qualified} is managed by a DaemonSet (set ignore_daemonsets to skip)"
|
|
));
|
|
}
|
|
continue;
|
|
}
|
|
if Self::has_emptydir_volume(pod) && !options.delete_emptydir_data {
|
|
blocking.push(format!(
|
|
"{qualified} uses emptyDir volumes (set delete_emptydir_data to allow eviction)"
|
|
));
|
|
continue;
|
|
}
|
|
evictable.push(pod.clone());
|
|
}
|
|
|
|
if !blocking.is_empty() {
|
|
return Err(format!(
|
|
"Cannot drain node — the following pods block eviction:\n - {}",
|
|
blocking.join("\n - ")
|
|
));
|
|
}
|
|
Ok((evictable, skipped))
|
|
}
|
|
|
|
async fn evict_pod(&self, pod: &Pod) -> Result<(), Error> {
|
|
let name = pod.metadata.name.as_deref().unwrap_or_default();
|
|
let ns = pod.metadata.namespace.as_deref().unwrap_or_default();
|
|
debug!("Evicting pod {ns}/{name}");
|
|
Api::<Pod>::namespaced(self.client.clone(), ns)
|
|
.evict(name, &EvictParams::default())
|
|
.await
|
|
.map(|_| ())
|
|
}
|
|
|
|
/// Drains a node: cordon → classify → evict & wait.
|
|
pub async fn drain_node(&self, node_name: &str, options: &DrainOptions) -> Result<(), Error> {
|
|
debug!("Cordoning '{node_name}'");
|
|
self.cordon_node(node_name).await?;
|
|
|
|
let pods = self.list_pods_on_node(node_name).await?;
|
|
debug!("Found {} pod(s) on '{node_name}'", pods.len());
|
|
|
|
let (evictable, skipped) =
|
|
Self::classify_pods_for_drain(&pods, options).map_err(|msg| {
|
|
error!("{msg}");
|
|
Error::Discovery(DiscoveryError::MissingResource(msg))
|
|
})?;
|
|
|
|
for s in &skipped {
|
|
info!("Skipping pod: {s}");
|
|
}
|
|
if evictable.is_empty() {
|
|
info!("No pods to evict on '{node_name}'");
|
|
return Ok(());
|
|
}
|
|
info!("Evicting {} pod(s) from '{node_name}'", evictable.len());
|
|
|
|
let mut start = tokio::time::Instant::now();
|
|
let poll = Duration::from_secs(5);
|
|
let mut pending = evictable;
|
|
|
|
loop {
|
|
for pod in &pending {
|
|
match self.evict_pod(pod).await {
|
|
Ok(()) => {}
|
|
Err(Error::Api(ErrorResponse { code: 404, .. })) => {}
|
|
Err(Error::Api(ErrorResponse { code: 429, .. })) => {
|
|
warn!(
|
|
"PDB blocked eviction of {}/{}; will retry",
|
|
pod.metadata.namespace.as_deref().unwrap_or(""),
|
|
pod.metadata.name.as_deref().unwrap_or("")
|
|
);
|
|
}
|
|
Err(e) => {
|
|
error!(
|
|
"Failed to evict {}/{}: {e}",
|
|
pod.metadata.namespace.as_deref().unwrap_or(""),
|
|
pod.metadata.name.as_deref().unwrap_or("")
|
|
);
|
|
return Err(e);
|
|
}
|
|
}
|
|
}
|
|
|
|
sleep(poll).await;
|
|
|
|
let mut still_present = Vec::new();
|
|
for pod in pending {
|
|
let ns = pod.metadata.namespace.as_deref().unwrap_or_default();
|
|
let name = pod.metadata.name.as_deref().unwrap_or_default();
|
|
match self.get_pod(name, Some(ns)).await? {
|
|
Some(_) => still_present.push(pod),
|
|
None => debug!("Pod {ns}/{name} evicted"),
|
|
}
|
|
}
|
|
pending = still_present;
|
|
|
|
if pending.is_empty() {
|
|
break;
|
|
}
|
|
|
|
if start.elapsed() > options.timeout {
|
|
match helper::prompt_drain_timeout_action(
|
|
node_name,
|
|
pending.len(),
|
|
options.timeout,
|
|
)? {
|
|
helper::DrainTimeoutAction::Accept => break,
|
|
helper::DrainTimeoutAction::Retry => {
|
|
start = tokio::time::Instant::now();
|
|
continue;
|
|
}
|
|
helper::DrainTimeoutAction::Abort => {
|
|
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
|
"Drain aborted. {} pod(s) remaining on '{node_name}'",
|
|
pending.len()
|
|
))));
|
|
}
|
|
}
|
|
}
|
|
debug!("Waiting for {} pod(s) on '{node_name}'", pending.len());
|
|
}
|
|
|
|
debug!("'{node_name}' drained successfully");
|
|
Ok(())
|
|
}
|
|
|
|
/// Safely reboots a node: drain → reboot → wait for Ready → uncordon.
|
|
pub async fn reboot_node(
|
|
&self,
|
|
node_name: &str,
|
|
drain_options: &DrainOptions,
|
|
timeout: Duration,
|
|
) -> Result<(), Error> {
|
|
info!("Starting reboot for '{node_name}'");
|
|
let node_api: Api<Node> = Api::all(self.client.clone());
|
|
|
|
let boot_id_before = node_api
|
|
.get(node_name)
|
|
.await?
|
|
.status
|
|
.as_ref()
|
|
.and_then(|s| s.node_info.as_ref())
|
|
.map(|ni| ni.boot_id.clone())
|
|
.ok_or_else(|| {
|
|
Error::Discovery(DiscoveryError::MissingResource(format!(
|
|
"Node '{node_name}' has no boot_id in status"
|
|
)))
|
|
})?;
|
|
|
|
info!("Draining '{node_name}'");
|
|
self.drain_node(node_name, drain_options).await?;
|
|
|
|
let start = tokio::time::Instant::now();
|
|
|
|
info!("Scheduling reboot for '{node_name}'");
|
|
let reboot_cmd =
|
|
"echo rebooting ; nohup bash -c 'sleep 5 && nsenter -t 1 -m -- systemctl reboot'";
|
|
match self
|
|
.run_privileged_command_on_node(node_name, reboot_cmd)
|
|
.await
|
|
{
|
|
Ok(_) => debug!("Reboot command dispatched"),
|
|
Err(e) => debug!("Reboot command error (expected if node began shutdown): {e}"),
|
|
}
|
|
|
|
info!("Waiting for '{node_name}' to begin shutdown");
|
|
self.wait_for_node_not_ready(node_name, timeout.saturating_sub(start.elapsed()))
|
|
.await?;
|
|
|
|
if start.elapsed() > timeout {
|
|
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
|
"Timeout during reboot of '{node_name}' (shutdown phase)"
|
|
))));
|
|
}
|
|
|
|
info!("Waiting for '{node_name}' to come back online");
|
|
self.wait_for_node_ready_with_timeout(node_name, timeout.saturating_sub(start.elapsed()))
|
|
.await?;
|
|
|
|
if start.elapsed() > timeout {
|
|
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
|
"Timeout during reboot of '{node_name}' (ready phase)"
|
|
))));
|
|
}
|
|
|
|
let boot_id_after = node_api
|
|
.get(node_name)
|
|
.await?
|
|
.status
|
|
.as_ref()
|
|
.and_then(|s| s.node_info.as_ref())
|
|
.map(|ni| ni.boot_id.clone())
|
|
.ok_or_else(|| {
|
|
Error::Discovery(DiscoveryError::MissingResource(format!(
|
|
"Node '{node_name}' has no boot_id after reboot"
|
|
)))
|
|
})?;
|
|
|
|
if boot_id_before == boot_id_after {
|
|
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
|
"Node '{node_name}' did not actually reboot (boot_id unchanged: {boot_id_before})"
|
|
))));
|
|
}
|
|
|
|
info!("'{node_name}' rebooted ({boot_id_before} → {boot_id_after})");
|
|
self.uncordon_node(node_name).await?;
|
|
info!("'{node_name}' reboot complete ({:?})", start.elapsed());
|
|
Ok(())
|
|
}
|
|
|
|
/// Write a set of files to a node's filesystem via a privileged ephemeral pod.
|
|
pub async fn write_files_to_node(
|
|
&self,
|
|
node_name: &str,
|
|
files: &[NodeFile],
|
|
) -> Result<String, Error> {
|
|
let ns = self.client.default_namespace();
|
|
let suffix = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_millis();
|
|
let name = format!("harmony-k8s-writer-{suffix}");
|
|
|
|
debug!("Writing {} file(s) to '{node_name}'", files.len());
|
|
|
|
let mut data = BTreeMap::new();
|
|
let mut script = String::from("set -e\n");
|
|
for (i, file) in files.iter().enumerate() {
|
|
let key = format!("f{i}");
|
|
data.insert(key.clone(), file.content.clone());
|
|
script.push_str(&format!("mkdir -p \"$(dirname \"/host{}\")\"\n", file.path));
|
|
script.push_str(&format!("cp \"/payload/{key}\" \"/host{}\"\n", file.path));
|
|
script.push_str(&format!("chmod {:o} \"/host{}\"\n", file.mode, file.path));
|
|
}
|
|
|
|
let cm = ConfigMap {
|
|
metadata: ObjectMeta {
|
|
name: Some(name.clone()),
|
|
namespace: Some(ns.to_string()),
|
|
..Default::default()
|
|
},
|
|
data: Some(data),
|
|
..Default::default()
|
|
};
|
|
|
|
let cm_api: Api<ConfigMap> = Api::namespaced(self.client.clone(), ns);
|
|
cm_api.create(&PostParams::default(), &cm).await?;
|
|
debug!("Created ConfigMap '{name}'");
|
|
|
|
let (host_vol, host_mount) = helper::host_root_volume();
|
|
let payload_vol = Volume {
|
|
name: "payload".to_string(),
|
|
config_map: Some(ConfigMapVolumeSource {
|
|
name: name.clone(),
|
|
..Default::default()
|
|
}),
|
|
..Default::default()
|
|
};
|
|
let payload_mount = VolumeMount {
|
|
name: "payload".to_string(),
|
|
mount_path: "/payload".to_string(),
|
|
..Default::default()
|
|
};
|
|
|
|
let bundle = helper::build_privileged_bundle(
|
|
PrivilegedPodConfig {
|
|
name: name.clone(),
|
|
namespace: ns.to_string(),
|
|
node_name: node_name.to_string(),
|
|
container_name: "writer".to_string(),
|
|
command: vec!["/bin/bash".to_string(), "-c".to_string(), script],
|
|
volumes: vec![payload_vol, host_vol],
|
|
volume_mounts: vec![payload_mount, host_mount],
|
|
host_pid: false,
|
|
host_network: false,
|
|
},
|
|
&self.get_k8s_distribution().await?,
|
|
);
|
|
|
|
bundle.apply(self).await?;
|
|
debug!("Created privileged pod bundle '{name}'");
|
|
|
|
let result = self.wait_for_pod_completion(&name, ns).await;
|
|
|
|
debug!("Cleaning up '{name}'");
|
|
let _ = bundle.delete(self).await;
|
|
let _ = cm_api.delete(&name, &DeleteParams::default()).await;
|
|
|
|
result
|
|
}
|
|
|
|
/// Run a privileged command on a node via an ephemeral pod.
|
|
pub async fn run_privileged_command_on_node(
|
|
&self,
|
|
node_name: &str,
|
|
command: &str,
|
|
) -> Result<String, Error> {
|
|
let namespace = self.client.default_namespace();
|
|
let suffix = SystemTime::now()
|
|
.duration_since(UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_millis();
|
|
let name = format!("harmony-k8s-cmd-{suffix}");
|
|
|
|
debug!("Running privileged command on '{node_name}': {command}");
|
|
|
|
let (host_vol, host_mount) = helper::host_root_volume();
|
|
let bundle = helper::build_privileged_bundle(
|
|
PrivilegedPodConfig {
|
|
name: name.clone(),
|
|
namespace: namespace.to_string(),
|
|
node_name: node_name.to_string(),
|
|
container_name: "runner".to_string(),
|
|
command: vec![
|
|
"/bin/bash".to_string(),
|
|
"-c".to_string(),
|
|
command.to_string(),
|
|
],
|
|
volumes: vec![host_vol],
|
|
volume_mounts: vec![host_mount],
|
|
host_pid: true,
|
|
host_network: true,
|
|
},
|
|
&self.get_k8s_distribution().await?,
|
|
);
|
|
|
|
bundle.apply(self).await?;
|
|
debug!("Privileged pod '{name}' created");
|
|
|
|
let result = self.wait_for_pod_completion(&name, namespace).await;
|
|
|
|
debug!("Cleaning up '{name}'");
|
|
let _ = bundle.delete(self).await;
|
|
|
|
result
|
|
}
|
|
}
|
|
|
|
// ── Tests ────────────────────────────────────────────────────────────────────
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use k8s_openapi::api::core::v1::{EmptyDirVolumeSource, PodSpec, PodStatus, Volume};
|
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference};
|
|
|
|
use super::*;
|
|
|
|
fn base_pod(name: &str, ns: &str) -> Pod {
|
|
Pod {
|
|
metadata: ObjectMeta {
|
|
name: Some(name.to_string()),
|
|
namespace: Some(ns.to_string()),
|
|
..Default::default()
|
|
},
|
|
spec: Some(PodSpec::default()),
|
|
status: Some(PodStatus {
|
|
phase: Some("Running".to_string()),
|
|
..Default::default()
|
|
}),
|
|
}
|
|
}
|
|
|
|
fn mirror_pod(name: &str, ns: &str) -> Pod {
|
|
let mut pod = base_pod(name, ns);
|
|
pod.metadata.annotations = Some(std::collections::BTreeMap::from([(
|
|
"kubernetes.io/config.mirror".to_string(),
|
|
"abc123".to_string(),
|
|
)]));
|
|
pod
|
|
}
|
|
|
|
fn daemonset_pod(name: &str, ns: &str) -> Pod {
|
|
let mut pod = base_pod(name, ns);
|
|
pod.metadata.owner_references = Some(vec![OwnerReference {
|
|
api_version: "apps/v1".to_string(),
|
|
kind: "DaemonSet".to_string(),
|
|
name: "some-ds".to_string(),
|
|
uid: "uid-ds".to_string(),
|
|
..Default::default()
|
|
}]);
|
|
pod
|
|
}
|
|
|
|
fn emptydir_pod(name: &str, ns: &str) -> Pod {
|
|
let mut pod = base_pod(name, ns);
|
|
pod.spec = Some(PodSpec {
|
|
volumes: Some(vec![Volume {
|
|
name: "scratch".to_string(),
|
|
empty_dir: Some(EmptyDirVolumeSource::default()),
|
|
..Default::default()
|
|
}]),
|
|
..Default::default()
|
|
});
|
|
pod
|
|
}
|
|
|
|
fn completed_pod(name: &str, ns: &str, phase: &str) -> Pod {
|
|
let mut pod = base_pod(name, ns);
|
|
pod.status = Some(PodStatus {
|
|
phase: Some(phase.to_string()),
|
|
..Default::default()
|
|
});
|
|
pod
|
|
}
|
|
|
|
fn default_opts() -> DrainOptions {
|
|
DrainOptions::default()
|
|
}
|
|
|
|
// All test bodies are identical to the original — only the module path changed.
|
|
|
|
#[test]
|
|
fn empty_pod_list_returns_empty_vecs() {
|
|
let (e, s) = K8sClient::classify_pods_for_drain(&[], &default_opts()).unwrap();
|
|
assert!(e.is_empty());
|
|
assert!(s.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn normal_pod_is_evictable() {
|
|
let pods = vec![base_pod("web", "default")];
|
|
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
|
|
assert_eq!(e.len(), 1);
|
|
assert!(s.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn mirror_pod_is_skipped() {
|
|
let pods = vec![mirror_pod("kube-apiserver", "kube-system")];
|
|
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
|
|
assert!(e.is_empty());
|
|
assert!(s[0].contains("mirror pod"));
|
|
}
|
|
|
|
#[test]
|
|
fn completed_pods_are_skipped() {
|
|
for phase in ["Succeeded", "Failed"] {
|
|
let pods = vec![completed_pod("job", "batch", phase)];
|
|
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
|
|
assert!(e.is_empty());
|
|
assert!(s[0].contains("completed"));
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn daemonset_skipped_when_ignored() {
|
|
let pods = vec![daemonset_pod("fluentd", "logging")];
|
|
let opts = DrainOptions {
|
|
ignore_daemonsets: true,
|
|
..default_opts()
|
|
};
|
|
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap();
|
|
assert!(e.is_empty());
|
|
assert!(s[0].contains("DaemonSet-managed"));
|
|
}
|
|
|
|
#[test]
|
|
fn daemonset_blocks_when_not_ignored() {
|
|
let pods = vec![daemonset_pod("fluentd", "logging")];
|
|
let opts = DrainOptions {
|
|
ignore_daemonsets: false,
|
|
..default_opts()
|
|
};
|
|
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
|
|
assert!(err.contains("DaemonSet") && err.contains("logging/fluentd"));
|
|
}
|
|
|
|
#[test]
|
|
fn emptydir_blocks_without_flag() {
|
|
let pods = vec![emptydir_pod("cache", "default")];
|
|
let opts = DrainOptions {
|
|
delete_emptydir_data: false,
|
|
..default_opts()
|
|
};
|
|
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
|
|
assert!(err.contains("emptyDir") && err.contains("default/cache"));
|
|
}
|
|
|
|
#[test]
|
|
fn emptydir_evictable_with_flag() {
|
|
let pods = vec![emptydir_pod("cache", "default")];
|
|
let opts = DrainOptions {
|
|
delete_emptydir_data: true,
|
|
..default_opts()
|
|
};
|
|
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap();
|
|
assert_eq!(e.len(), 1);
|
|
assert!(s.is_empty());
|
|
}
|
|
|
|
#[test]
|
|
fn multiple_blocking_all_reported() {
|
|
let pods = vec![daemonset_pod("ds", "ns1"), emptydir_pod("ed", "ns2")];
|
|
let opts = DrainOptions {
|
|
ignore_daemonsets: false,
|
|
delete_emptydir_data: false,
|
|
..default_opts()
|
|
};
|
|
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
|
|
assert!(err.contains("ns1/ds") && err.contains("ns2/ed"));
|
|
}
|
|
|
|
#[test]
|
|
fn mixed_pods_classified_correctly() {
|
|
let pods = vec![
|
|
base_pod("web", "default"),
|
|
mirror_pod("kube-apiserver", "kube-system"),
|
|
daemonset_pod("fluentd", "logging"),
|
|
completed_pod("job", "batch", "Succeeded"),
|
|
base_pod("api", "default"),
|
|
];
|
|
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
|
|
let names: Vec<&str> = e
|
|
.iter()
|
|
.map(|p| p.metadata.name.as_deref().unwrap())
|
|
.collect();
|
|
assert_eq!(names, vec!["web", "api"]);
|
|
assert_eq!(s.len(), 3);
|
|
}
|
|
|
|
#[test]
|
|
fn mirror_checked_before_completed() {
|
|
let mut pod = mirror_pod("static-etcd", "kube-system");
|
|
pod.status = Some(PodStatus {
|
|
phase: Some("Succeeded".to_string()),
|
|
..Default::default()
|
|
});
|
|
let (_, s) = K8sClient::classify_pods_for_drain(&[pod], &default_opts()).unwrap();
|
|
assert!(s[0].contains("mirror pod"), "got: {}", s[0]);
|
|
}
|
|
|
|
#[test]
|
|
fn completed_checked_before_daemonset() {
|
|
let mut pod = daemonset_pod("collector", "monitoring");
|
|
pod.status = Some(PodStatus {
|
|
phase: Some("Failed".to_string()),
|
|
..Default::default()
|
|
});
|
|
let (_, s) = K8sClient::classify_pods_for_drain(&[pod], &default_opts()).unwrap();
|
|
assert!(s[0].contains("completed"), "got: {}", s[0]);
|
|
}
|
|
}
|