feat(k8s): Can now apply resources of any scope. Kind of a hack leveraging the dynamic type under the hood but this is due to a limitation of kube-rs #241

Merged
stremblay merged 1 commits from feat/k8s_apply_any_scope into master 2026-03-03 20:06:04 +00:00
2 changed files with 51 additions and 9 deletions

View File

@@ -10,8 +10,10 @@ use k8s_openapi::api::core::v1::{
}; };
use k8s_openapi::api::rbac::v1::{ClusterRoleBinding, RoleRef, Subject}; use k8s_openapi::api::rbac::v1::{ClusterRoleBinding, RoleRef, Subject};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::api::DynamicObject;
use kube::error::DiscoveryError; use kube::error::DiscoveryError;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use serde::de::DeserializeOwned;
#[derive(Debug)] #[derive(Debug)]
pub struct PrivilegedPodConfig { pub struct PrivilegedPodConfig {
@@ -279,6 +281,16 @@ pub fn prompt_drain_timeout_action(
} }
} }
/// JSON round-trip: DynamicObject → K
///
/// Safe because the DynamicObject was produced by the apiserver from a
/// payload that was originally serialized from K, so the schema is identical.
pub(crate) fn dyn_to_typed<K: DeserializeOwned>(obj: DynamicObject) -> Result<K, kube::Error> {
serde_json::to_value(obj)
.and_then(serde_json::from_value)
.map_err(kube::Error::SerdeError)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@@ -832,7 +832,6 @@ impl K8sClient {
pub async fn apply<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error> pub async fn apply<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
where where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize, K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
<K as Resource>::Scope: ApplyStrategy<K>,
<K as kube::Resource>::DynamicType: Default, <K as kube::Resource>::DynamicType: Default,
{ {
debug!( debug!(
@@ -845,9 +844,34 @@ impl K8sClient {
serde_json::to_value(resource).unwrap_or(serde_json::Value::Null) serde_json::to_value(resource).unwrap_or(serde_json::Value::Null)
); );
let api: Api<K> = // ── 1. Extract GVK from compile-time type info ──────────────────────────
<<K as Resource>::Scope as ApplyStrategy<K>>::get_api(&self.client, namespace); let dyntype = K::DynamicType::default();
// api.create(&PostParams::default(), &resource).await let gvk = GroupVersionKind {
group: K::group(&dyntype).to_string(),
version: K::version(&dyntype).to_string(),
kind: K::kind(&dyntype).to_string(),
};
// ── 2. Resolve scope at runtime via discovery ────────────────────────────
let discovery = self.discovery().await?;
let (ar, caps) = discovery.resolve_gvk(&gvk).ok_or_else(|| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Cannot resolve GVK: {:?}",
gvk
)))
})?;
let effective_namespace = if caps.scope == Scope::Cluster {
None
} else {
// Prefer the caller-supplied namespace, fall back to the resource's own
namespace.or_else(|| resource.meta().namespace.as_deref())
};
// ── 3. Determine the effective namespace based on the discovered scope ───
let api: Api<DynamicObject> =
get_dynamic_api(ar, caps, self.client.clone(), effective_namespace, false);
let patch_params = PatchParams::apply("harmony"); let patch_params = PatchParams::apply("harmony");
let name = resource let name = resource
.meta() .meta()
@@ -883,7 +907,7 @@ impl K8sClient {
if current_yaml == new_yaml { if current_yaml == new_yaml {
println!("No changes detected."); println!("No changes detected.");
// Return the current resource state as there are no changes. // Return the current resource state as there are no changes.
return Ok(current); return helper::dyn_to_typed(current);
} }
println!("Changes detected:"); println!("Changes detected:");
@@ -930,13 +954,19 @@ impl K8sClient {
.patch(name, &patch_params, &Patch::Apply(resource)) .patch(name, &patch_params, &Patch::Apply(resource))
.await .await
{ {
Ok(obj) => Ok(obj), Ok(obj) => helper::dyn_to_typed(obj),
Err(Error::Api(ErrorResponse { code: 404, .. })) => { Err(Error::Api(ErrorResponse { code: 404, .. })) => {
// Resource doesn't exist, server-side apply should create it // Resource doesn't exist, server-side apply should create it
// This can happen with some API servers, so we explicitly create // This can happen with some API servers, so we explicitly create
debug!("Resource '{}' not found, creating via POST", name); debug!("Resource '{}' not found, creating via POST", name);
api.create(&PostParams::default(), resource) let dyn_resource: DynamicObject = serde_json::from_value(
serde_json::to_value(resource).map_err(Error::SerdeError)?,
)
.map_err(Error::SerdeError)?;
api.create(&PostParams::default(), &dyn_resource)
.await .await
.and_then(helper::dyn_to_typed)
.map_err(|e| { .map_err(|e| {
error!("Failed to create resource '{}': {}", name, e); error!("Failed to create resource '{}': {}", name, e);
e e
@@ -1907,7 +1937,7 @@ impl K8sClient {
); );
// ── 3. Evict & wait loop ────────────────────────────────────── // ── 3. Evict & wait loop ──────────────────────────────────────
let start = tokio::time::Instant::now(); let mut start = tokio::time::Instant::now();
let poll_interval = Duration::from_secs(5); let poll_interval = Duration::from_secs(5);
let mut pending = evictable; let mut pending = evictable;
@@ -1990,7 +2020,7 @@ impl K8sClient {
} }
helper::DrainTimeoutAction::Retry => { helper::DrainTimeoutAction::Retry => {
// Reset the start time to retry for another full timeout period // Reset the start time to retry for another full timeout period
let start = tokio::time::Instant::now(); start = tokio::time::Instant::now();
continue; continue;
} }
helper::DrainTimeoutAction::Abort => { helper::DrainTimeoutAction::Abort => {