diff --git a/harmony/src/domain/topology/k8s/helper.rs b/harmony/src/domain/topology/k8s/helper.rs index d5944ec..b0917f8 100644 --- a/harmony/src/domain/topology/k8s/helper.rs +++ b/harmony/src/domain/topology/k8s/helper.rs @@ -10,8 +10,10 @@ use k8s_openapi::api::core::v1::{ }; use k8s_openapi::api::rbac::v1::{ClusterRoleBinding, RoleRef, Subject}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; +use kube::api::DynamicObject; use kube::error::DiscoveryError; use log::{debug, error, info, warn}; +use serde::de::DeserializeOwned; #[derive(Debug)] pub struct PrivilegedPodConfig { @@ -279,6 +281,16 @@ pub fn prompt_drain_timeout_action( } } +/// JSON round-trip: DynamicObject → K +/// +/// Safe because the DynamicObject was produced by the apiserver from a +/// payload that was originally serialized from K, so the schema is identical. +pub(crate) fn dyn_to_typed(obj: DynamicObject) -> Result { + serde_json::to_value(obj) + .and_then(serde_json::from_value) + .map_err(kube::Error::SerdeError) +} + #[cfg(test)] mod tests { use super::*; diff --git a/harmony/src/domain/topology/k8s/mod.rs b/harmony/src/domain/topology/k8s/mod.rs index b12a9ed..fba42e2 100644 --- a/harmony/src/domain/topology/k8s/mod.rs +++ b/harmony/src/domain/topology/k8s/mod.rs @@ -832,7 +832,6 @@ impl K8sClient { pub async fn apply(&self, resource: &K, namespace: Option<&str>) -> Result where K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize, - ::Scope: ApplyStrategy, ::DynamicType: Default, { debug!( @@ -845,9 +844,34 @@ impl K8sClient { serde_json::to_value(resource).unwrap_or(serde_json::Value::Null) ); - let api: Api = - <::Scope as ApplyStrategy>::get_api(&self.client, namespace); - // api.create(&PostParams::default(), &resource).await + // ── 1. Extract GVK from compile-time type info ────────────────────────── + let dyntype = K::DynamicType::default(); + let gvk = GroupVersionKind { + group: K::group(&dyntype).to_string(), + version: K::version(&dyntype).to_string(), + kind: K::kind(&dyntype).to_string(), + }; + + // ── 2. Resolve scope at runtime via discovery ──────────────────────────── + let discovery = self.discovery().await?; + let (ar, caps) = discovery.resolve_gvk(&gvk).ok_or_else(|| { + Error::Discovery(DiscoveryError::MissingResource(format!( + "Cannot resolve GVK: {:?}", + gvk + ))) + })?; + + let effective_namespace = if caps.scope == Scope::Cluster { + None + } else { + // Prefer the caller-supplied namespace, fall back to the resource's own + namespace.or_else(|| resource.meta().namespace.as_deref()) + }; + + // ── 3. Determine the effective namespace based on the discovered scope ─── + let api: Api = + get_dynamic_api(ar, caps, self.client.clone(), effective_namespace, false); + let patch_params = PatchParams::apply("harmony"); let name = resource .meta() @@ -883,7 +907,7 @@ impl K8sClient { if current_yaml == new_yaml { println!("No changes detected."); // Return the current resource state as there are no changes. - return Ok(current); + return helper::dyn_to_typed(current); } println!("Changes detected:"); @@ -930,13 +954,19 @@ impl K8sClient { .patch(name, &patch_params, &Patch::Apply(resource)) .await { - Ok(obj) => Ok(obj), + Ok(obj) => helper::dyn_to_typed(obj), Err(Error::Api(ErrorResponse { code: 404, .. })) => { // Resource doesn't exist, server-side apply should create it // This can happen with some API servers, so we explicitly create debug!("Resource '{}' not found, creating via POST", name); - api.create(&PostParams::default(), resource) + let dyn_resource: DynamicObject = serde_json::from_value( + serde_json::to_value(resource).map_err(Error::SerdeError)?, + ) + .map_err(Error::SerdeError)?; + + api.create(&PostParams::default(), &dyn_resource) .await + .and_then(helper::dyn_to_typed) .map_err(|e| { error!("Failed to create resource '{}': {}", name, e); e @@ -1907,7 +1937,7 @@ impl K8sClient { ); // ── 3. Evict & wait loop ────────────────────────────────────── - let start = tokio::time::Instant::now(); + let mut start = tokio::time::Instant::now(); let poll_interval = Duration::from_secs(5); let mut pending = evictable; @@ -1990,7 +2020,7 @@ impl K8sClient { } helper::DrainTimeoutAction::Retry => { // Reset the start time to retry for another full timeout period - let start = tokio::time::Instant::now(); + start = tokio::time::Instant::now(); continue; } helper::DrainTimeoutAction::Abort => {