feat(k8s): Can now apply resources of any scope. Kind of a hack leveraging the dynamic type under the hood but this is due to a limitation of kube-rs #241
@@ -10,8 +10,10 @@ use k8s_openapi::api::core::v1::{
|
||||
};
|
||||
use k8s_openapi::api::rbac::v1::{ClusterRoleBinding, RoleRef, Subject};
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
||||
use kube::api::DynamicObject;
|
||||
use kube::error::DiscoveryError;
|
||||
use log::{debug, error, info, warn};
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PrivilegedPodConfig {
|
||||
@@ -279,6 +281,16 @@ pub fn prompt_drain_timeout_action(
|
||||
}
|
||||
}
|
||||
|
||||
/// JSON round-trip: DynamicObject → K
|
||||
///
|
||||
/// Safe because the DynamicObject was produced by the apiserver from a
|
||||
/// payload that was originally serialized from K, so the schema is identical.
|
||||
pub(crate) fn dyn_to_typed<K: DeserializeOwned>(obj: DynamicObject) -> Result<K, kube::Error> {
|
||||
serde_json::to_value(obj)
|
||||
.and_then(serde_json::from_value)
|
||||
.map_err(kube::Error::SerdeError)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
@@ -832,7 +832,6 @@ impl K8sClient {
|
||||
pub async fn apply<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
|
||||
where
|
||||
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
|
||||
<K as Resource>::Scope: ApplyStrategy<K>,
|
||||
<K as kube::Resource>::DynamicType: Default,
|
||||
{
|
||||
debug!(
|
||||
@@ -845,9 +844,34 @@ impl K8sClient {
|
||||
serde_json::to_value(resource).unwrap_or(serde_json::Value::Null)
|
||||
);
|
||||
|
||||
let api: Api<K> =
|
||||
<<K as Resource>::Scope as ApplyStrategy<K>>::get_api(&self.client, namespace);
|
||||
// api.create(&PostParams::default(), &resource).await
|
||||
// ── 1. Extract GVK from compile-time type info ──────────────────────────
|
||||
let dyntype = K::DynamicType::default();
|
||||
let gvk = GroupVersionKind {
|
||||
group: K::group(&dyntype).to_string(),
|
||||
version: K::version(&dyntype).to_string(),
|
||||
kind: K::kind(&dyntype).to_string(),
|
||||
};
|
||||
|
||||
// ── 2. Resolve scope at runtime via discovery ────────────────────────────
|
||||
let discovery = self.discovery().await?;
|
||||
let (ar, caps) = discovery.resolve_gvk(&gvk).ok_or_else(|| {
|
||||
Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Cannot resolve GVK: {:?}",
|
||||
gvk
|
||||
)))
|
||||
})?;
|
||||
|
||||
let effective_namespace = if caps.scope == Scope::Cluster {
|
||||
None
|
||||
} else {
|
||||
// Prefer the caller-supplied namespace, fall back to the resource's own
|
||||
namespace.or_else(|| resource.meta().namespace.as_deref())
|
||||
};
|
||||
|
||||
// ── 3. Determine the effective namespace based on the discovered scope ───
|
||||
let api: Api<DynamicObject> =
|
||||
get_dynamic_api(ar, caps, self.client.clone(), effective_namespace, false);
|
||||
|
||||
let patch_params = PatchParams::apply("harmony");
|
||||
let name = resource
|
||||
.meta()
|
||||
@@ -883,7 +907,7 @@ impl K8sClient {
|
||||
if current_yaml == new_yaml {
|
||||
println!("No changes detected.");
|
||||
// Return the current resource state as there are no changes.
|
||||
return Ok(current);
|
||||
return helper::dyn_to_typed(current);
|
||||
}
|
||||
|
||||
println!("Changes detected:");
|
||||
@@ -930,13 +954,19 @@ impl K8sClient {
|
||||
.patch(name, &patch_params, &Patch::Apply(resource))
|
||||
.await
|
||||
{
|
||||
Ok(obj) => Ok(obj),
|
||||
Ok(obj) => helper::dyn_to_typed(obj),
|
||||
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
|
||||
// Resource doesn't exist, server-side apply should create it
|
||||
// This can happen with some API servers, so we explicitly create
|
||||
debug!("Resource '{}' not found, creating via POST", name);
|
||||
api.create(&PostParams::default(), resource)
|
||||
let dyn_resource: DynamicObject = serde_json::from_value(
|
||||
serde_json::to_value(resource).map_err(Error::SerdeError)?,
|
||||
)
|
||||
.map_err(Error::SerdeError)?;
|
||||
|
||||
api.create(&PostParams::default(), &dyn_resource)
|
||||
.await
|
||||
.and_then(helper::dyn_to_typed)
|
||||
.map_err(|e| {
|
||||
error!("Failed to create resource '{}': {}", name, e);
|
||||
e
|
||||
@@ -1907,7 +1937,7 @@ impl K8sClient {
|
||||
);
|
||||
|
||||
// ── 3. Evict & wait loop ──────────────────────────────────────
|
||||
let start = tokio::time::Instant::now();
|
||||
let mut start = tokio::time::Instant::now();
|
||||
let poll_interval = Duration::from_secs(5);
|
||||
let mut pending = evictable;
|
||||
|
||||
@@ -1990,7 +2020,7 @@ impl K8sClient {
|
||||
}
|
||||
helper::DrainTimeoutAction::Retry => {
|
||||
// Reset the start time to retry for another full timeout period
|
||||
let start = tokio::time::Instant::now();
|
||||
start = tokio::time::Instant::now();
|
||||
continue;
|
||||
}
|
||||
helper::DrainTimeoutAction::Abort => {
|
||||
|
||||
Reference in New Issue
Block a user