364 lines
13 KiB
Rust
364 lines
13 KiB
Rust
use derive_new::new;
|
|
use k8s_openapi::{
|
|
ClusterResourceScope, NamespaceResourceScope,
|
|
api::{apps::v1::Deployment, core::v1::Pod},
|
|
};
|
|
use kube::{
|
|
Client, Config, Error, Resource,
|
|
api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt},
|
|
config::{KubeConfigOptions, Kubeconfig},
|
|
core::ErrorResponse,
|
|
runtime::reflector::Lookup,
|
|
};
|
|
use kube::{api::DynamicObject, runtime::conditions};
|
|
use kube::{
|
|
api::{ApiResource, GroupVersionKind},
|
|
runtime::wait::await_condition,
|
|
};
|
|
use log::{debug, error, trace};
|
|
use serde::{Serialize, de::DeserializeOwned};
|
|
use similar::TextDiff;
|
|
|
|
#[derive(new, Clone)]
|
|
pub struct K8sClient {
|
|
client: Client,
|
|
}
|
|
|
|
impl Serialize for K8sClient {
|
|
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
|
|
where
|
|
S: serde::Serializer,
|
|
{
|
|
todo!()
|
|
}
|
|
}
|
|
|
|
impl std::fmt::Debug for K8sClient {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
// This is a poor man's debug implementation for now as kube::Client does not provide much
|
|
// useful information
|
|
f.write_fmt(format_args!(
|
|
"K8sClient {{ kube client using default namespace {} }}",
|
|
self.client.default_namespace()
|
|
))
|
|
}
|
|
}
|
|
|
|
impl K8sClient {
|
|
pub async fn try_default() -> Result<Self, Error> {
|
|
Ok(Self {
|
|
client: Client::try_default().await?,
|
|
})
|
|
}
|
|
|
|
pub async fn wait_until_deployment_ready(
|
|
&self,
|
|
name: String,
|
|
namespace: Option<&str>,
|
|
timeout: Option<u64>,
|
|
) -> Result<(), String> {
|
|
let api: Api<Deployment>;
|
|
|
|
if let Some(ns) = namespace {
|
|
api = Api::namespaced(self.client.clone(), ns);
|
|
} else {
|
|
api = Api::default_namespaced(self.client.clone());
|
|
}
|
|
|
|
let establish = await_condition(api, name.as_str(), conditions::is_deployment_completed());
|
|
let t = timeout.unwrap_or(300);
|
|
let res = tokio::time::timeout(std::time::Duration::from_secs(t), establish).await;
|
|
|
|
if res.is_ok() {
|
|
Ok(())
|
|
} else {
|
|
Err("timed out while waiting for deployment".to_string())
|
|
}
|
|
}
|
|
|
|
/// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}`
|
|
pub async fn exec_app(
|
|
&self,
|
|
name: String,
|
|
namespace: Option<&str>,
|
|
command: Vec<&str>,
|
|
) -> Result<(), String> {
|
|
let api: Api<Pod>;
|
|
|
|
if let Some(ns) = namespace {
|
|
api = Api::namespaced(self.client.clone(), ns);
|
|
} else {
|
|
api = Api::default_namespaced(self.client.clone());
|
|
}
|
|
let pod_list = api
|
|
.list(&ListParams::default().labels(format!("app.kubernetes.io/name={name}").as_str()))
|
|
.await
|
|
.expect("couldn't get list of pods");
|
|
|
|
let res = api
|
|
.exec(
|
|
pod_list
|
|
.items
|
|
.first()
|
|
.expect("couldn't get pod")
|
|
.name()
|
|
.expect("couldn't get pod name")
|
|
.into_owned()
|
|
.as_str(),
|
|
command,
|
|
&AttachParams::default(),
|
|
)
|
|
.await;
|
|
|
|
match res {
|
|
Err(e) => Err(e.to_string()),
|
|
Ok(mut process) => {
|
|
let status = process
|
|
.take_status()
|
|
.expect("Couldn't get status")
|
|
.await
|
|
.expect("Couldn't unwrap status");
|
|
|
|
if let Some(s) = status.status {
|
|
debug!("Status: {} - {:?}", s, status.details);
|
|
if s == "Success" { Ok(()) } else { Err(s) }
|
|
} else {
|
|
Err("Couldn't get inner status of pod exec".to_string())
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Apply a resource in namespace
|
|
///
|
|
/// See `kubectl apply` for more information on the expected behavior of this function
|
|
pub async fn apply<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
|
|
where
|
|
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
|
|
<K as Resource>::Scope: ApplyStrategy<K>,
|
|
<K as kube::Resource>::DynamicType: Default,
|
|
{
|
|
debug!(
|
|
"Applying resource {:?} with ns {:?}",
|
|
resource.meta().name,
|
|
namespace
|
|
);
|
|
trace!(
|
|
"{:#}",
|
|
serde_json::to_value(resource).unwrap_or(serde_json::Value::Null)
|
|
);
|
|
|
|
let api: Api<K> =
|
|
<<K as Resource>::Scope as ApplyStrategy<K>>::get_api(&self.client, namespace);
|
|
// api.create(&PostParams::default(), &resource).await
|
|
let patch_params = PatchParams::apply("harmony");
|
|
let name = resource
|
|
.meta()
|
|
.name
|
|
.as_ref()
|
|
.expect("K8s Resource should have a name");
|
|
|
|
if *crate::config::DRY_RUN {
|
|
match api.get(name).await {
|
|
Ok(current) => {
|
|
trace!("Received current value {current:#?}");
|
|
// The resource exists, so we calculate and display a diff.
|
|
println!("\nPerforming dry-run for resource: '{}'", name);
|
|
let mut current_yaml = serde_yaml::to_value(¤t).unwrap_or_else(|_| {
|
|
panic!("Could not serialize current value : {current:#?}")
|
|
});
|
|
if current_yaml.is_mapping() && current_yaml.get("status").is_some() {
|
|
let map = current_yaml.as_mapping_mut().unwrap();
|
|
let removed = map.remove_entry("status");
|
|
trace!("Removed status {:?}", removed);
|
|
} else {
|
|
trace!(
|
|
"Did not find status entry for current object {}/{}",
|
|
current.meta().namespace.as_ref().unwrap_or(&"".to_string()),
|
|
current.meta().name.as_ref().unwrap_or(&"".to_string())
|
|
);
|
|
}
|
|
let current_yaml = serde_yaml::to_string(¤t_yaml)
|
|
.unwrap_or_else(|_| "Failed to serialize current resource".to_string());
|
|
let new_yaml = serde_yaml::to_string(resource)
|
|
.unwrap_or_else(|_| "Failed to serialize new resource".to_string());
|
|
|
|
if current_yaml == new_yaml {
|
|
println!("No changes detected.");
|
|
// Return the current resource state as there are no changes.
|
|
return Ok(current);
|
|
}
|
|
|
|
println!("Changes detected:");
|
|
let diff = TextDiff::from_lines(¤t_yaml, &new_yaml);
|
|
|
|
// Iterate over the changes and print them in a git-like diff format.
|
|
for change in diff.iter_all_changes() {
|
|
let sign = match change.tag() {
|
|
similar::ChangeTag::Delete => "-",
|
|
similar::ChangeTag::Insert => "+",
|
|
similar::ChangeTag::Equal => " ",
|
|
};
|
|
print!("{}{}", sign, change);
|
|
}
|
|
// In a dry run, we return the new resource state that would have been applied.
|
|
Ok(resource.clone())
|
|
}
|
|
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
|
|
// The resource does not exist, so the "diff" is the entire new resource.
|
|
println!("\nPerforming dry-run for new resource: '{}'", name);
|
|
println!(
|
|
"Resource does not exist. It would be created with the following content:"
|
|
);
|
|
let new_yaml = serde_yaml::to_string(resource)
|
|
.unwrap_or_else(|_| "Failed to serialize new resource".to_string());
|
|
|
|
// Print each line of the new resource with a '+' prefix.
|
|
for line in new_yaml.lines() {
|
|
println!("+{}", line);
|
|
}
|
|
// In a dry run, we return the new resource state that would have been created.
|
|
Ok(resource.clone())
|
|
}
|
|
Err(e) => {
|
|
// Another API error occurred.
|
|
error!("Failed to get resource '{}': {}", name, e);
|
|
Err(e)
|
|
}
|
|
}
|
|
} else {
|
|
return api
|
|
.patch(name, &patch_params, &Patch::Apply(resource))
|
|
.await;
|
|
}
|
|
}
|
|
|
|
pub async fn apply_many<K>(&self, resource: &[K], ns: Option<&str>) -> Result<Vec<K>, Error>
|
|
where
|
|
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
|
|
<K as Resource>::Scope: ApplyStrategy<K>,
|
|
<K as kube::Resource>::DynamicType: Default,
|
|
{
|
|
let mut result = Vec::new();
|
|
for r in resource.iter() {
|
|
result.push(self.apply(r, ns).await?);
|
|
}
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
pub async fn apply_yaml_many(
|
|
&self,
|
|
#[allow(clippy::ptr_arg)] yaml: &Vec<serde_yaml::Value>,
|
|
ns: Option<&str>,
|
|
) -> Result<(), Error> {
|
|
for y in yaml.iter() {
|
|
self.apply_yaml(y, ns).await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn apply_yaml(
|
|
&self,
|
|
yaml: &serde_yaml::Value,
|
|
ns: Option<&str>,
|
|
) -> Result<(), Error> {
|
|
let obj: DynamicObject = serde_yaml::from_value(yaml.clone()).expect("TODO do not unwrap");
|
|
let name = obj.metadata.name.as_ref().expect("YAML must have a name");
|
|
|
|
let api_version = yaml
|
|
.get("apiVersion")
|
|
.expect("couldn't get apiVersion from YAML")
|
|
.as_str()
|
|
.expect("couldn't get apiVersion as str");
|
|
let kind = yaml
|
|
.get("kind")
|
|
.expect("couldn't get kind from YAML")
|
|
.as_str()
|
|
.expect("couldn't get kind as str");
|
|
|
|
let split: Vec<&str> = api_version.splitn(2, "/").collect();
|
|
let g = split[0];
|
|
let v = split[1];
|
|
|
|
let gvk = GroupVersionKind::gvk(g, v, kind);
|
|
let api_resource = ApiResource::from_gvk(&gvk);
|
|
|
|
let namespace = match ns {
|
|
Some(n) => n,
|
|
None => obj
|
|
.metadata
|
|
.namespace
|
|
.as_ref()
|
|
.expect("YAML must have a namespace"),
|
|
};
|
|
|
|
// 5. Create a dynamic API client for this resource type.
|
|
let api: Api<DynamicObject> =
|
|
Api::namespaced_with(self.client.clone(), namespace, &api_resource);
|
|
|
|
// 6. Apply the object to the cluster using Server-Side Apply.
|
|
// This will create the resource if it doesn't exist, or update it if it does.
|
|
println!(
|
|
"Applying Argo Application '{}' in namespace '{}'...",
|
|
name, namespace
|
|
);
|
|
let patch_params = PatchParams::apply("harmony"); // Use a unique field manager name
|
|
let result = api.patch(name, &patch_params, &Patch::Apply(&obj)).await?;
|
|
|
|
println!("Successfully applied '{}'.", result.name_any());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn from_kubeconfig(path: &str) -> Option<K8sClient> {
|
|
let k = match Kubeconfig::read_from(path) {
|
|
Ok(k) => k,
|
|
Err(e) => {
|
|
error!("Failed to load kubeconfig from {path} : {e}");
|
|
return None;
|
|
}
|
|
};
|
|
Some(K8sClient::new(
|
|
Client::try_from(
|
|
Config::from_custom_kubeconfig(k, &KubeConfigOptions::default())
|
|
.await
|
|
.unwrap(),
|
|
)
|
|
.unwrap(),
|
|
))
|
|
}
|
|
}
|
|
|
|
pub trait ApplyStrategy<K: Resource> {
|
|
fn get_api(client: &Client, ns: Option<&str>) -> Api<K>;
|
|
}
|
|
|
|
/// Implementation for all resources that are cluster-scoped.
|
|
/// It will always use `Api::all` and ignore the namespace parameter.
|
|
impl<K> ApplyStrategy<K> for ClusterResourceScope
|
|
where
|
|
K: Resource<Scope = ClusterResourceScope>,
|
|
<K as kube::Resource>::DynamicType: Default,
|
|
{
|
|
fn get_api(client: &Client, _ns: Option<&str>) -> Api<K> {
|
|
Api::all(client.clone())
|
|
}
|
|
}
|
|
|
|
/// Implementation for all resources that are namespace-scoped.
|
|
/// It will use `Api::namespaced` if a namespace is provided, otherwise
|
|
/// it falls back to the default namespace configured in your kubeconfig.
|
|
impl<K> ApplyStrategy<K> for NamespaceResourceScope
|
|
where
|
|
K: Resource<Scope = NamespaceResourceScope>,
|
|
<K as kube::Resource>::DynamicType: Default,
|
|
{
|
|
fn get_api(client: &Client, ns: Option<&str>) -> Api<K> {
|
|
match ns {
|
|
Some(ns) => Api::namespaced(client.clone(), ns),
|
|
None => Api::default_namespaced(client.clone()),
|
|
}
|
|
}
|
|
}
|