feat(k8s): Can now apply resources of any scope. Kind of a hack leveraging the dynamic type under the hood but this is due to a limitation of kube-rs #241
@@ -10,8 +10,10 @@ use k8s_openapi::api::core::v1::{
|
|||||||
};
|
};
|
||||||
use k8s_openapi::api::rbac::v1::{ClusterRoleBinding, RoleRef, Subject};
|
use k8s_openapi::api::rbac::v1::{ClusterRoleBinding, RoleRef, Subject};
|
||||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
||||||
|
use kube::api::DynamicObject;
|
||||||
use kube::error::DiscoveryError;
|
use kube::error::DiscoveryError;
|
||||||
use log::{debug, error, info, warn};
|
use log::{debug, error, info, warn};
|
||||||
|
use serde::de::DeserializeOwned;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct PrivilegedPodConfig {
|
pub struct PrivilegedPodConfig {
|
||||||
@@ -279,6 +281,16 @@ pub fn prompt_drain_timeout_action(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// JSON round-trip: DynamicObject → K
|
||||||
|
///
|
||||||
|
/// Safe because the DynamicObject was produced by the apiserver from a
|
||||||
|
/// payload that was originally serialized from K, so the schema is identical.
|
||||||
|
pub(crate) fn dyn_to_typed<K: DeserializeOwned>(obj: DynamicObject) -> Result<K, kube::Error> {
|
||||||
|
serde_json::to_value(obj)
|
||||||
|
.and_then(serde_json::from_value)
|
||||||
|
.map_err(kube::Error::SerdeError)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
@@ -832,7 +832,6 @@ impl K8sClient {
|
|||||||
pub async fn apply<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
|
pub async fn apply<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
|
||||||
where
|
where
|
||||||
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
|
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
|
||||||
<K as Resource>::Scope: ApplyStrategy<K>,
|
|
||||||
<K as kube::Resource>::DynamicType: Default,
|
<K as kube::Resource>::DynamicType: Default,
|
||||||
{
|
{
|
||||||
debug!(
|
debug!(
|
||||||
@@ -845,9 +844,34 @@ impl K8sClient {
|
|||||||
serde_json::to_value(resource).unwrap_or(serde_json::Value::Null)
|
serde_json::to_value(resource).unwrap_or(serde_json::Value::Null)
|
||||||
);
|
);
|
||||||
|
|
||||||
let api: Api<K> =
|
// ── 1. Extract GVK from compile-time type info ──────────────────────────
|
||||||
<<K as Resource>::Scope as ApplyStrategy<K>>::get_api(&self.client, namespace);
|
let dyntype = K::DynamicType::default();
|
||||||
// api.create(&PostParams::default(), &resource).await
|
let gvk = GroupVersionKind {
|
||||||
|
group: K::group(&dyntype).to_string(),
|
||||||
|
version: K::version(&dyntype).to_string(),
|
||||||
|
kind: K::kind(&dyntype).to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// ── 2. Resolve scope at runtime via discovery ────────────────────────────
|
||||||
|
let discovery = self.discovery().await?;
|
||||||
|
let (ar, caps) = discovery.resolve_gvk(&gvk).ok_or_else(|| {
|
||||||
|
Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||||
|
"Cannot resolve GVK: {:?}",
|
||||||
|
gvk
|
||||||
|
)))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let effective_namespace = if caps.scope == Scope::Cluster {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
// Prefer the caller-supplied namespace, fall back to the resource's own
|
||||||
|
namespace.or_else(|| resource.meta().namespace.as_deref())
|
||||||
|
};
|
||||||
|
|
||||||
|
// ── 3. Determine the effective namespace based on the discovered scope ───
|
||||||
|
let api: Api<DynamicObject> =
|
||||||
|
get_dynamic_api(ar, caps, self.client.clone(), effective_namespace, false);
|
||||||
|
|
||||||
let patch_params = PatchParams::apply("harmony");
|
let patch_params = PatchParams::apply("harmony");
|
||||||
let name = resource
|
let name = resource
|
||||||
.meta()
|
.meta()
|
||||||
@@ -883,7 +907,7 @@ impl K8sClient {
|
|||||||
if current_yaml == new_yaml {
|
if current_yaml == new_yaml {
|
||||||
println!("No changes detected.");
|
println!("No changes detected.");
|
||||||
// Return the current resource state as there are no changes.
|
// Return the current resource state as there are no changes.
|
||||||
return Ok(current);
|
return helper::dyn_to_typed(current);
|
||||||
}
|
}
|
||||||
|
|
||||||
println!("Changes detected:");
|
println!("Changes detected:");
|
||||||
@@ -930,13 +954,19 @@ impl K8sClient {
|
|||||||
.patch(name, &patch_params, &Patch::Apply(resource))
|
.patch(name, &patch_params, &Patch::Apply(resource))
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(obj) => Ok(obj),
|
Ok(obj) => helper::dyn_to_typed(obj),
|
||||||
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
|
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
|
||||||
// Resource doesn't exist, server-side apply should create it
|
// Resource doesn't exist, server-side apply should create it
|
||||||
// This can happen with some API servers, so we explicitly create
|
// This can happen with some API servers, so we explicitly create
|
||||||
debug!("Resource '{}' not found, creating via POST", name);
|
debug!("Resource '{}' not found, creating via POST", name);
|
||||||
api.create(&PostParams::default(), resource)
|
let dyn_resource: DynamicObject = serde_json::from_value(
|
||||||
|
serde_json::to_value(resource).map_err(Error::SerdeError)?,
|
||||||
|
)
|
||||||
|
.map_err(Error::SerdeError)?;
|
||||||
|
|
||||||
|
api.create(&PostParams::default(), &dyn_resource)
|
||||||
.await
|
.await
|
||||||
|
.and_then(helper::dyn_to_typed)
|
||||||
.map_err(|e| {
|
.map_err(|e| {
|
||||||
error!("Failed to create resource '{}': {}", name, e);
|
error!("Failed to create resource '{}': {}", name, e);
|
||||||
e
|
e
|
||||||
@@ -1907,7 +1937,7 @@ impl K8sClient {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// ── 3. Evict & wait loop ──────────────────────────────────────
|
// ── 3. Evict & wait loop ──────────────────────────────────────
|
||||||
let start = tokio::time::Instant::now();
|
let mut start = tokio::time::Instant::now();
|
||||||
let poll_interval = Duration::from_secs(5);
|
let poll_interval = Duration::from_secs(5);
|
||||||
let mut pending = evictable;
|
let mut pending = evictable;
|
||||||
|
|
||||||
@@ -1990,7 +2020,7 @@ impl K8sClient {
|
|||||||
}
|
}
|
||||||
helper::DrainTimeoutAction::Retry => {
|
helper::DrainTimeoutAction::Retry => {
|
||||||
// Reset the start time to retry for another full timeout period
|
// Reset the start time to retry for another full timeout period
|
||||||
let start = tokio::time::Instant::now();
|
start = tokio::time::Instant::now();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
helper::DrainTimeoutAction::Abort => {
|
helper::DrainTimeoutAction::Abort => {
|
||||||
|
|||||||
Reference in New Issue
Block a user