use derive_new::new; use k8s_openapi::{ ClusterResourceScope, NamespaceResourceScope, api::{apps::v1::Deployment, core::v1::Pod}, }; use kube::{ Client, Config, Error, Resource, api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt}, config::{KubeConfigOptions, Kubeconfig}, core::ErrorResponse, runtime::reflector::Lookup, }; use kube::{api::DynamicObject, runtime::conditions}; use kube::{ api::{ApiResource, GroupVersionKind}, runtime::wait::await_condition, }; use log::{debug, error, trace}; use serde::{Serialize, de::DeserializeOwned}; use similar::TextDiff; #[derive(new, Clone)] pub struct K8sClient { client: Client, } impl Serialize for K8sClient { fn serialize(&self, _serializer: S) -> Result where S: serde::Serializer, { todo!() } } impl std::fmt::Debug for K8sClient { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { // This is a poor man's debug implementation for now as kube::Client does not provide much // useful information f.write_fmt(format_args!( "K8sClient {{ kube client using default namespace {} }}", self.client.default_namespace() )) } } impl K8sClient { pub async fn try_default() -> Result { Ok(Self { client: Client::try_default().await?, }) } pub async fn wait_until_deployment_ready( &self, name: String, namespace: Option<&str>, timeout: Option, ) -> Result<(), String> { let api: Api; if let Some(ns) = namespace { api = Api::namespaced(self.client.clone(), ns); } else { api = Api::default_namespaced(self.client.clone()); } let establish = await_condition(api, name.as_str(), conditions::is_deployment_completed()); let t = timeout.unwrap_or(300); let res = tokio::time::timeout(std::time::Duration::from_secs(t), establish).await; if res.is_ok() { Ok(()) } else { Err("timed out while waiting for deployment".to_string()) } } /// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}` pub async fn exec_app( &self, name: String, namespace: Option<&str>, command: Vec<&str>, ) -> Result<(), String> { let api: Api; if let Some(ns) = namespace { api = Api::namespaced(self.client.clone(), ns); } else { api = Api::default_namespaced(self.client.clone()); } let pod_list = api .list(&ListParams::default().labels(format!("app.kubernetes.io/name={name}").as_str())) .await .expect("couldn't get list of pods"); let res = api .exec( pod_list .items .first() .expect("couldn't get pod") .name() .expect("couldn't get pod name") .into_owned() .as_str(), command, &AttachParams::default(), ) .await; match res { Err(e) => Err(e.to_string()), Ok(mut process) => { let status = process .take_status() .expect("Couldn't get status") .await .expect("Couldn't unwrap status"); if let Some(s) = status.status { debug!("Status: {}", s); if s == "Success" { Ok(()) } else { Err(s) } } else { Err("Couldn't get inner status of pod exec".to_string()) } } } } /// Apply a resource in namespace /// /// See `kubectl apply` for more information on the expected behavior of this function pub async fn apply(&self, resource: &K, namespace: Option<&str>) -> Result where K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize, ::Scope: ApplyStrategy, ::DynamicType: Default, { debug!( "Applying resource {:?} with ns {:?}", resource.meta().name, namespace ); trace!( "{:#}", serde_json::to_value(resource).unwrap_or(serde_json::Value::Null) ); let api: Api = <::Scope as ApplyStrategy>::get_api(&self.client, namespace); // api.create(&PostParams::default(), &resource).await let patch_params = PatchParams::apply("harmony"); let name = resource .meta() .name .as_ref() .expect("K8s Resource should have a name"); if *crate::config::DRY_RUN { match api.get(name).await { Ok(current) => { trace!("Received current value {current:#?}"); // The resource exists, so we calculate and display a diff. println!("\nPerforming dry-run for resource: '{}'", name); let mut current_yaml = serde_yaml::to_value(¤t).unwrap_or_else(|_| { panic!("Could not serialize current value : {current:#?}") }); if current_yaml.is_mapping() && current_yaml.get("status").is_some() { let map = current_yaml.as_mapping_mut().unwrap(); let removed = map.remove_entry("status"); trace!("Removed status {:?}", removed); } else { trace!( "Did not find status entry for current object {}/{}", current.meta().namespace.as_ref().unwrap_or(&"".to_string()), current.meta().name.as_ref().unwrap_or(&"".to_string()) ); } let current_yaml = serde_yaml::to_string(¤t_yaml) .unwrap_or_else(|_| "Failed to serialize current resource".to_string()); let new_yaml = serde_yaml::to_string(resource) .unwrap_or_else(|_| "Failed to serialize new resource".to_string()); if current_yaml == new_yaml { println!("No changes detected."); // Return the current resource state as there are no changes. return Ok(current); } println!("Changes detected:"); let diff = TextDiff::from_lines(¤t_yaml, &new_yaml); // Iterate over the changes and print them in a git-like diff format. for change in diff.iter_all_changes() { let sign = match change.tag() { similar::ChangeTag::Delete => "-", similar::ChangeTag::Insert => "+", similar::ChangeTag::Equal => " ", }; print!("{}{}", sign, change); } // In a dry run, we return the new resource state that would have been applied. Ok(resource.clone()) } Err(Error::Api(ErrorResponse { code: 404, .. })) => { // The resource does not exist, so the "diff" is the entire new resource. println!("\nPerforming dry-run for new resource: '{}'", name); println!( "Resource does not exist. It would be created with the following content:" ); let new_yaml = serde_yaml::to_string(resource) .unwrap_or_else(|_| "Failed to serialize new resource".to_string()); // Print each line of the new resource with a '+' prefix. for line in new_yaml.lines() { println!("+{}", line); } // In a dry run, we return the new resource state that would have been created. Ok(resource.clone()) } Err(e) => { // Another API error occurred. error!("Failed to get resource '{}': {}", name, e); Err(e) } } } else { return api .patch(name, &patch_params, &Patch::Apply(resource)) .await; } } pub async fn apply_many(&self, resource: &[K], ns: Option<&str>) -> Result, Error> where K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize, ::Scope: ApplyStrategy, ::DynamicType: Default, { let mut result = Vec::new(); for r in resource.iter() { result.push(self.apply(r, ns).await?); } Ok(result) } pub async fn apply_yaml_many( &self, #[allow(clippy::ptr_arg)] yaml: &Vec, ns: Option<&str>, ) -> Result<(), Error> { for y in yaml.iter() { self.apply_yaml(y, ns).await?; } Ok(()) } pub async fn apply_yaml( &self, yaml: &serde_yaml::Value, ns: Option<&str>, ) -> Result<(), Error> { let obj: DynamicObject = serde_yaml::from_value(yaml.clone()).expect("TODO do not unwrap"); let name = obj.metadata.name.as_ref().expect("YAML must have a name"); let api_version = yaml .get("apiVersion") .expect("couldn't get apiVersion from YAML") .as_str() .expect("couldn't get apiVersion as str"); let kind = yaml .get("kind") .expect("couldn't get kind from YAML") .as_str() .expect("couldn't get kind as str"); let split: Vec<&str> = api_version.splitn(2, "/").collect(); let g = split[0]; let v = split[1]; let gvk = GroupVersionKind::gvk(g, v, kind); let api_resource = ApiResource::from_gvk(&gvk); let namespace = match ns { Some(n) => n, None => obj .metadata .namespace .as_ref() .expect("YAML must have a namespace"), }; // 5. Create a dynamic API client for this resource type. let api: Api = Api::namespaced_with(self.client.clone(), namespace, &api_resource); // 6. Apply the object to the cluster using Server-Side Apply. // This will create the resource if it doesn't exist, or update it if it does. println!( "Applying Argo Application '{}' in namespace '{}'...", name, namespace ); let patch_params = PatchParams::apply("harmony"); // Use a unique field manager name let result = api.patch(name, &patch_params, &Patch::Apply(&obj)).await?; println!("Successfully applied '{}'.", result.name_any()); Ok(()) } pub(crate) async fn from_kubeconfig(path: &str) -> Option { let k = match Kubeconfig::read_from(path) { Ok(k) => k, Err(e) => { error!("Failed to load kubeconfig from {path} : {e}"); return None; } }; Some(K8sClient::new( Client::try_from( Config::from_custom_kubeconfig(k, &KubeConfigOptions::default()) .await .unwrap(), ) .unwrap(), )) } } pub trait ApplyStrategy { fn get_api(client: &Client, ns: Option<&str>) -> Api; } /// Implementation for all resources that are cluster-scoped. /// It will always use `Api::all` and ignore the namespace parameter. impl ApplyStrategy for ClusterResourceScope where K: Resource, ::DynamicType: Default, { fn get_api(client: &Client, _ns: Option<&str>) -> Api { Api::all(client.clone()) } } /// Implementation for all resources that are namespace-scoped. /// It will use `Api::namespaced` if a namespace is provided, otherwise /// it falls back to the default namespace configured in your kubeconfig. impl ApplyStrategy for NamespaceResourceScope where K: Resource, ::DynamicType: Default, { fn get_api(client: &Client, ns: Option<&str>) -> Api { match ns { Some(ns) => Api::namespaced(client.clone(), ns), None => Api::default_namespaced(client.clone()), } } }