416 lines
14 KiB
Rust
416 lines
14 KiB
Rust
use std::collections::HashMap;
|
|
|
|
use k8s_openapi::api::{
|
|
apps::v1::Deployment,
|
|
core::v1::{Namespace, Node, ServiceAccount},
|
|
};
|
|
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
|
|
use kube::api::ApiResource;
|
|
use kube::{
|
|
Error, Resource,
|
|
api::{Api, DynamicObject, GroupVersionKind, ListParams, ObjectList},
|
|
core::ErrorResponse,
|
|
runtime::conditions,
|
|
runtime::wait::await_condition,
|
|
};
|
|
use log::debug;
|
|
use serde::de::DeserializeOwned;
|
|
use serde_json::Value;
|
|
use std::time::Duration;
|
|
|
|
use crate::client::K8sClient;
|
|
use crate::types::ScopeResolver;
|
|
|
|
impl K8sClient {
|
|
pub async fn has_healthy_deployment_with_label(
|
|
&self,
|
|
namespace: &str,
|
|
label_selector: &str,
|
|
) -> Result<bool, Error> {
|
|
let api: Api<Deployment> = Api::namespaced(self.client.clone(), namespace);
|
|
let list = api
|
|
.list(&ListParams::default().labels(label_selector))
|
|
.await?;
|
|
for d in list.items {
|
|
let available = d
|
|
.status
|
|
.as_ref()
|
|
.and_then(|s| s.available_replicas)
|
|
.unwrap_or(0);
|
|
if available > 0 {
|
|
return Ok(true);
|
|
}
|
|
if let Some(conds) = d.status.as_ref().and_then(|s| s.conditions.as_ref())
|
|
&& conds
|
|
.iter()
|
|
.any(|c| c.type_ == "Available" && c.status == "True")
|
|
{
|
|
return Ok(true);
|
|
}
|
|
}
|
|
Ok(false)
|
|
}
|
|
|
|
pub async fn list_namespaces_with_healthy_deployments(
|
|
&self,
|
|
label_selector: &str,
|
|
) -> Result<Vec<String>, Error> {
|
|
let api: Api<Deployment> = Api::all(self.client.clone());
|
|
let list = api
|
|
.list(&ListParams::default().labels(label_selector))
|
|
.await?;
|
|
|
|
let mut healthy_ns: HashMap<String, bool> = HashMap::new();
|
|
for d in list.items {
|
|
let ns = match d.metadata.namespace.clone() {
|
|
Some(n) => n,
|
|
None => continue,
|
|
};
|
|
let available = d
|
|
.status
|
|
.as_ref()
|
|
.and_then(|s| s.available_replicas)
|
|
.unwrap_or(0);
|
|
let is_healthy = if available > 0 {
|
|
true
|
|
} else {
|
|
d.status
|
|
.as_ref()
|
|
.and_then(|s| s.conditions.as_ref())
|
|
.map(|c| {
|
|
c.iter()
|
|
.any(|c| c.type_ == "Available" && c.status == "True")
|
|
})
|
|
.unwrap_or(false)
|
|
};
|
|
if is_healthy {
|
|
healthy_ns.insert(ns, true);
|
|
}
|
|
}
|
|
Ok(healthy_ns.into_keys().collect())
|
|
}
|
|
|
|
pub async fn get_controller_service_account_name(
|
|
&self,
|
|
ns: &str,
|
|
) -> Result<Option<String>, Error> {
|
|
let api: Api<Deployment> = Api::namespaced(self.client.clone(), ns);
|
|
let list = api
|
|
.list(&ListParams::default().labels("app.kubernetes.io/component=controller"))
|
|
.await?;
|
|
if let Some(dep) = list.items.first()
|
|
&& let Some(sa) = dep
|
|
.spec
|
|
.as_ref()
|
|
.and_then(|s| s.template.spec.as_ref())
|
|
.and_then(|s| s.service_account_name.clone())
|
|
{
|
|
return Ok(Some(sa));
|
|
}
|
|
Ok(None)
|
|
}
|
|
|
|
pub async fn list_clusterrolebindings_json(&self) -> Result<Vec<Value>, Error> {
|
|
let gvk = GroupVersionKind::gvk("rbac.authorization.k8s.io", "v1", "ClusterRoleBinding");
|
|
let ar = ApiResource::from_gvk(&gvk);
|
|
let api: Api<DynamicObject> = Api::all_with(self.client.clone(), &ar);
|
|
let list = api.list(&ListParams::default()).await?;
|
|
Ok(list
|
|
.items
|
|
.into_iter()
|
|
.map(|o| serde_json::to_value(&o).unwrap_or(Value::Null))
|
|
.collect())
|
|
}
|
|
|
|
pub async fn is_service_account_cluster_wide(&self, sa: &str, ns: &str) -> Result<bool, Error> {
|
|
let sa_user = format!("system:serviceaccount:{ns}:{sa}");
|
|
for crb in self.list_clusterrolebindings_json().await? {
|
|
if let Some(subjects) = crb.get("subjects").and_then(|s| s.as_array()) {
|
|
for subj in subjects {
|
|
let kind = subj.get("kind").and_then(|v| v.as_str()).unwrap_or("");
|
|
let name = subj.get("name").and_then(|v| v.as_str()).unwrap_or("");
|
|
let subj_ns = subj.get("namespace").and_then(|v| v.as_str()).unwrap_or("");
|
|
if (kind == "ServiceAccount" && name == sa && subj_ns == ns)
|
|
|| (kind == "User" && name == sa_user)
|
|
{
|
|
return Ok(true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(false)
|
|
}
|
|
|
|
pub async fn has_crd(&self, name: &str) -> Result<bool, Error> {
|
|
let api: Api<CustomResourceDefinition> = Api::all(self.client.clone());
|
|
let crds = api
|
|
.list(&ListParams::default().fields(&format!("metadata.name={name}")))
|
|
.await?;
|
|
Ok(!crds.items.is_empty())
|
|
}
|
|
|
|
/// Polls until a CRD is registered in the API server.
|
|
pub async fn wait_for_crd(&self, name: &str, timeout: Option<Duration>) -> Result<(), Error> {
|
|
let timeout = timeout.unwrap_or(Duration::from_secs(60));
|
|
let start = std::time::Instant::now();
|
|
let poll = Duration::from_secs(2);
|
|
|
|
loop {
|
|
if self.has_crd(name).await? {
|
|
return Ok(());
|
|
}
|
|
if start.elapsed() > timeout {
|
|
return Err(Error::Discovery(
|
|
kube::error::DiscoveryError::MissingResource(format!(
|
|
"CRD '{name}' not registered within {}s",
|
|
timeout.as_secs()
|
|
)),
|
|
));
|
|
}
|
|
tokio::time::sleep(poll).await;
|
|
}
|
|
}
|
|
|
|
pub async fn service_account_api(&self, namespace: &str) -> Api<ServiceAccount> {
|
|
Api::namespaced(self.client.clone(), namespace)
|
|
}
|
|
|
|
pub async fn get_resource_json_value(
|
|
&self,
|
|
name: &str,
|
|
namespace: Option<&str>,
|
|
gvk: &GroupVersionKind,
|
|
) -> Result<DynamicObject, Error> {
|
|
let ar = ApiResource::from_gvk(gvk);
|
|
let api: Api<DynamicObject> = match namespace {
|
|
Some(ns) => Api::namespaced_with(self.client.clone(), ns, &ar),
|
|
None => Api::default_namespaced_with(self.client.clone(), &ar),
|
|
};
|
|
api.get(name).await
|
|
}
|
|
|
|
pub async fn get_secret_json_value(
|
|
&self,
|
|
name: &str,
|
|
namespace: Option<&str>,
|
|
) -> Result<DynamicObject, Error> {
|
|
self.get_resource_json_value(
|
|
name,
|
|
namespace,
|
|
&GroupVersionKind {
|
|
group: String::new(),
|
|
version: "v1".to_string(),
|
|
kind: "Secret".to_string(),
|
|
},
|
|
)
|
|
.await
|
|
}
|
|
|
|
pub async fn get_deployment(
|
|
&self,
|
|
name: &str,
|
|
namespace: Option<&str>,
|
|
) -> Result<Option<Deployment>, Error> {
|
|
let api: Api<Deployment> = match namespace {
|
|
Some(ns) => {
|
|
debug!("Getting namespaced deployment '{name}' in '{ns}'");
|
|
Api::namespaced(self.client.clone(), ns)
|
|
}
|
|
None => {
|
|
debug!("Getting deployment '{name}' in default namespace");
|
|
Api::default_namespaced(self.client.clone())
|
|
}
|
|
};
|
|
api.get_opt(name).await
|
|
}
|
|
|
|
pub async fn scale_deployment(
|
|
&self,
|
|
name: &str,
|
|
namespace: Option<&str>,
|
|
replicas: u32,
|
|
) -> Result<(), Error> {
|
|
let api: Api<Deployment> = match namespace {
|
|
Some(ns) => Api::namespaced(self.client.clone(), ns),
|
|
None => Api::default_namespaced(self.client.clone()),
|
|
};
|
|
use kube::api::{Patch, PatchParams};
|
|
use serde_json::json;
|
|
let patch = json!({ "spec": { "replicas": replicas } });
|
|
api.patch_scale(name, &PatchParams::default(), &Patch::Merge(&patch))
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn delete_deployment(
|
|
&self,
|
|
name: &str,
|
|
namespace: Option<&str>,
|
|
) -> Result<(), Error> {
|
|
let api: Api<Deployment> = match namespace {
|
|
Some(ns) => Api::namespaced(self.client.clone(), ns),
|
|
None => Api::default_namespaced(self.client.clone()),
|
|
};
|
|
api.delete(name, &kube::api::DeleteParams::default())
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn wait_until_deployment_ready(
|
|
&self,
|
|
name: &str,
|
|
namespace: Option<&str>,
|
|
timeout: Option<Duration>,
|
|
) -> Result<(), String> {
|
|
let api: Api<Deployment> = match namespace {
|
|
Some(ns) => Api::namespaced(self.client.clone(), ns),
|
|
None => Api::default_namespaced(self.client.clone()),
|
|
};
|
|
let timeout = timeout.unwrap_or(Duration::from_secs(120));
|
|
let establish = await_condition(api, name, conditions::is_deployment_completed());
|
|
tokio::time::timeout(timeout, establish)
|
|
.await
|
|
.map(|_| ())
|
|
.map_err(|_| "Timed out waiting for deployment".to_string())
|
|
}
|
|
|
|
/// Gets a single named resource, using the correct API scope for `K`.
|
|
pub async fn get_resource<K>(
|
|
&self,
|
|
name: &str,
|
|
namespace: Option<&str>,
|
|
) -> Result<Option<K>, Error>
|
|
where
|
|
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
|
|
<K as Resource>::Scope: ScopeResolver<K>,
|
|
<K as Resource>::DynamicType: Default,
|
|
{
|
|
let api: Api<K> =
|
|
<<K as Resource>::Scope as ScopeResolver<K>>::get_api(&self.client, namespace);
|
|
api.get_opt(name).await
|
|
}
|
|
|
|
/// Deletes a single named resource. Returns `Ok(())` on success or if the
|
|
/// resource was already absent (idempotent).
|
|
pub async fn delete_resource<K>(&self, name: &str, namespace: Option<&str>) -> Result<(), Error>
|
|
where
|
|
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
|
|
<K as Resource>::Scope: ScopeResolver<K>,
|
|
<K as Resource>::DynamicType: Default,
|
|
{
|
|
let api: Api<K> =
|
|
<<K as Resource>::Scope as ScopeResolver<K>>::get_api(&self.client, namespace);
|
|
match api.delete(name, &kube::api::DeleteParams::default()).await {
|
|
Ok(_) => Ok(()),
|
|
Err(Error::Api(ErrorResponse { code: 404, .. })) => Ok(()),
|
|
Err(e) => Err(e),
|
|
}
|
|
}
|
|
|
|
pub async fn list_resources<K>(
|
|
&self,
|
|
namespace: Option<&str>,
|
|
list_params: Option<ListParams>,
|
|
) -> Result<ObjectList<K>, Error>
|
|
where
|
|
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
|
|
<K as Resource>::Scope: ScopeResolver<K>,
|
|
<K as Resource>::DynamicType: Default,
|
|
{
|
|
let api: Api<K> =
|
|
<<K as Resource>::Scope as ScopeResolver<K>>::get_api(&self.client, namespace);
|
|
api.list(&list_params.unwrap_or_default()).await
|
|
}
|
|
|
|
pub async fn list_all_resources_with_labels<K>(&self, labels: &str) -> Result<Vec<K>, Error>
|
|
where
|
|
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
|
|
<K as Resource>::DynamicType: Default,
|
|
{
|
|
Api::<K>::all(self.client.clone())
|
|
.list(&ListParams::default().labels(labels))
|
|
.await
|
|
.map(|l| l.items)
|
|
}
|
|
|
|
pub async fn get_all_resource_in_all_namespace<K>(&self) -> Result<Vec<K>, Error>
|
|
where
|
|
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
|
|
<K as Resource>::Scope: ScopeResolver<K>,
|
|
<K as Resource>::DynamicType: Default,
|
|
{
|
|
Api::<K>::all(self.client.clone())
|
|
.list(&Default::default())
|
|
.await
|
|
.map(|l| l.items)
|
|
}
|
|
|
|
pub async fn get_nodes(
|
|
&self,
|
|
list_params: Option<ListParams>,
|
|
) -> Result<ObjectList<Node>, Error> {
|
|
self.list_resources(None, list_params).await
|
|
}
|
|
|
|
pub async fn namespace_exists(&self, name: &str) -> Result<bool, Error> {
|
|
let api: Api<Namespace> = Api::all(self.client.clone());
|
|
match api.get_opt(name).await? {
|
|
Some(_) => Ok(true),
|
|
None => Ok(false),
|
|
}
|
|
}
|
|
|
|
pub async fn create_namespace(&self, name: &str) -> Result<Namespace, Error> {
|
|
let namespace = Namespace {
|
|
metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
|
|
name: Some(name.to_string()),
|
|
..Default::default()
|
|
},
|
|
..Default::default()
|
|
};
|
|
let api: Api<Namespace> = Api::all(self.client.clone());
|
|
api.create(&kube::api::PostParams::default(), &namespace)
|
|
.await
|
|
}
|
|
|
|
pub async fn wait_for_namespace(
|
|
&self,
|
|
name: &str,
|
|
timeout: Option<Duration>,
|
|
) -> Result<(), Error> {
|
|
let api: Api<Namespace> = Api::all(self.client.clone());
|
|
let timeout = timeout.unwrap_or(Duration::from_secs(60));
|
|
let start = std::time::Instant::now();
|
|
|
|
loop {
|
|
if start.elapsed() > timeout {
|
|
return Err(Error::Api(ErrorResponse {
|
|
status: "Timeout".to_string(),
|
|
message: format!("Namespace '{}' not ready within timeout", name),
|
|
reason: "Timeout".to_string(),
|
|
code: 408,
|
|
}));
|
|
}
|
|
|
|
match api.get_opt(name).await? {
|
|
Some(ns) => {
|
|
if let Some(status) = ns.status
|
|
&& status.phase == Some("Active".to_string())
|
|
{
|
|
return Ok(());
|
|
}
|
|
}
|
|
None => {
|
|
return Err(Error::Api(ErrorResponse {
|
|
status: "NotFound".to_string(),
|
|
message: format!("Namespace '{}' not found", name),
|
|
reason: "NotFound".to_string(),
|
|
code: 404,
|
|
}));
|
|
}
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
|
}
|
|
}
|
|
}
|