Compare commits

..

3 Commits

Author SHA1 Message Date
1802b10ddf fix:translated documentaion notes into English 2025-10-23 15:31:45 -04:00
dd3f07e5b7 doc for removing worker flag from cp on UPI
All checks were successful
Run Check Script / check (pull_request) Successful in 1m13s
2025-10-09 15:28:42 -04:00
cbbaae2ac8 okd_enable_user_workload_monitoring (#160)
Reviewed-on: #160
Co-authored-by: Willem <wrolleman@nationtech.io>
Co-committed-by: Willem <wrolleman@nationtech.io>
2025-09-29 14:32:38 +00:00
7 changed files with 255 additions and 301 deletions

View File

@@ -0,0 +1,56 @@
## **Remove Worker flag from OKD Control Planes**
### **Context**
On OKD user provisioned infrastructure the control plane nodes can have the flag node-role.kubernetes.io/worker which allows non critical workloads to be scheduled on the control-planes
### **Observed Symptoms**
- After adding HAProxy servers to the backend each back end appears down
- Traffic is redirected to the control planes instead of workers
- The pods router-default are incorrectly applied on the control planes rather than on the workers
- Pods are being scheduled on the control planes causing cluster instability
```
ss -tlnp | grep 80
```
- shows process haproxy is listening at 0.0.0.0:80 on cps
- same problem for port 443
- In namespace rook-ceph certain pods are deploted on cps rather than on worker nodes
### **Cause**
- when intalling UPI, the roles (master, worker) are not managed by the Machine Config operator and the cps are made schedulable by default.
### **Diagnostic**
check node labels:
```
oc get nodes --show-labels | grep control-plane
```
Inspecter kubelet configuration:
```
cat /etc/systemd/system/kubelet.service
```
find the line:
```
--node-labels=node-role.kubernetes.io/control-plane,node-role.kubernetes.io/master,node-role.kubernetes.io/worker
```
→ presence of label worker confirms the problem.
Verify the flag doesnt come from MCO
```
oc get machineconfig | grep rendered-master
```
**Solution:**
To make the control planes non schedulable you must patch the cluster scheduler resource
```
oc patch scheduler cluster --type merge -p '{"spec":{"mastersSchedulable":false}}'
```
after the patch is applied the workloads can be deplaced by draining the nodes
```
oc adm cordon <cp-node>
oc adm drain <cp-node> --ignore-daemonsets delete-emptydir-data
```

View File

@@ -1,7 +1,12 @@
use std::time::Duration;
use derive_new::new;
use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope,
api::{apps::v1::Deployment, core::v1::Pod},
api::{
apps::v1::Deployment,
core::v1::{Pod, PodStatus},
},
};
use kube::{
Client, Config, Error, Resource,
@@ -20,9 +25,7 @@ use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::{Value, json};
use similar::TextDiff;
use tokio::io::AsyncReadExt;
use crate::interpret::Outcome;
use tokio::{io::AsyncReadExt, time::sleep};
#[derive(new, Clone)]
pub struct K8sClient {
@@ -56,57 +59,6 @@ impl K8sClient {
})
}
pub async fn ensure_deployment(
&self,
resource_name: &str,
resource_namespace: &str,
) -> Result<Outcome, Error> {
match self
.get_deployment(resource_name, Some(&resource_namespace))
.await
{
Ok(Some(deployment)) => {
if let Some(status) = deployment.status {
let ready_count = status.ready_replicas.unwrap_or(0);
if ready_count >= 1 {
Ok(Outcome::success(format!(
"'{}' is ready with {} replica(s).",
resource_name, ready_count
)))
} else {
Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Deployment '{}' in namespace '{}' has 0 ready replicas",
resource_name, resource_namespace
))))
}
} else {
Err(Error::Api(ErrorResponse {
status: "Failure".to_string(),
message: format!(
"No status found for deployment '{}' in namespace '{}'",
resource_name, resource_namespace
),
reason: "MissingStatus".to_string(),
code: 404,
}))
}
}
Ok(None) => Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Deployment '{}' not found in namespace '{}'",
resource_name, resource_namespace
)))),
Err(e) => Err(Error::Api(ErrorResponse {
status: "Failure".to_string(),
message: format!(
"Failed to fetch deployment '{}' in namespace '{}': {}",
resource_name, resource_namespace, e
),
reason: "ApiError".to_string(),
code: 500,
})),
}
}
pub async fn get_resource_json_value(
&self,
name: &str,
@@ -144,25 +96,6 @@ impl K8sClient {
Ok(pods.get_opt(name).await?)
}
pub async fn patch_resource_by_merge(
&self,
name: &str,
namespace: Option<&str>,
gvk: &GroupVersionKind,
patch: Value,
) -> Result<(), Error> {
let gvk = ApiResource::from_gvk(gvk);
let resource: Api<DynamicObject> = if let Some(ns) = namespace {
Api::namespaced_with(self.client.clone(), ns, &gvk)
} else {
Api::default_namespaced_with(self.client.clone(), &gvk)
};
let pp = PatchParams::default();
let merge = Patch::Merge(&patch);
resource.patch(name, &pp, &merge).await?;
Ok(())
}
pub async fn scale_deployment(
&self,
name: &str,
@@ -226,6 +159,41 @@ impl K8sClient {
}
}
pub async fn wait_for_pod_ready(
&self,
pod_name: &str,
namespace: Option<&str>,
) -> Result<(), Error> {
let mut elapsed = 0;
let interval = 5; // seconds between checks
let timeout_secs = 120;
loop {
let pod = self.get_pod(pod_name, namespace).await?;
if let Some(p) = pod {
if let Some(status) = p.status {
if let Some(phase) = status.phase {
if phase.to_lowercase() == "running" {
return Ok(());
}
}
}
}
if elapsed >= timeout_secs {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"'{}' in ns '{}' did not become ready within {}s",
pod_name,
namespace.unwrap(),
timeout_secs
))));
}
sleep(Duration::from_secs(interval)).await;
elapsed += interval;
}
}
/// Will execute a commond in the first pod found that matches the specified label
/// '{label}={name}'
pub async fn exec_app_capture_output(
@@ -492,9 +460,12 @@ impl K8sClient {
.as_str()
.expect("couldn't get kind as str");
let split: Vec<&str> = api_version.splitn(2, "/").collect();
let g = split[0];
let v = split[1];
let mut it = api_version.splitn(2, '/');
let first = it.next().unwrap();
let (g, v) = match it.next() {
Some(second) => (first, second),
None => ("", first),
};
let gvk = GroupVersionKind::gvk(g, v, kind);
let api_resource = ApiResource::from_gvk(&gvk);

View File

@@ -1,3 +1,2 @@
mod helm;
pub mod update_default_okd_ingress_score;
pub use helm::*;

View File

@@ -1,223 +0,0 @@
use std::{
fs::File,
io::Read,
path::{Path, PathBuf},
sync::Arc,
};
use base64::{Engine, prelude::BASE64_STANDARD};
use fqdn::Path;
use harmony_types::id::Id;
use kube::api::GroupVersionKind;
use serde_json::json;
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
pub struct UpdateDefaultOkdIngressScore {
operator_name: String,
operator_namespace: String,
ca_name: String,
path_to_tls_crt: Path,
path_to_tls_key: Path,
path_to_ca_cert: Path,
}
impl<T: Topology> Score<T> for UpdateDefaultOkdIngressScore {
fn name(&self) -> String {
"UpdateDefaultOkdIngressScore".to_string()
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(UpdateDefaultOkdIngressInterpret {
score: self.clone(),
})
}
}
pub struct UpdateDefaultOkdIngressInterpret {
score: UpdateDefaultOkdIngressScore,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for UpdateDefaultOkdIngressInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await?;
let secret_name = "ingress_ca_secret";
self.ensure_ingress_operator(
&client,
&self.score.operator_name,
&self.score.operator_namespace,
)
.await?;
self.create_ca_cm(&client, self.score.path_to_ca_cert, &self.score.ca_name)
.await?;
self.patch_proxy(&client, &self.score.ca_name).await?;
self.create_tls_secret(
&client,
self.score.path_to_tls_crt,
self.score.path_to_tls_key,
&self.score.operator_namespace,
&secret_name,
)
.await?;
self.patch_ingress(&client, &self.score.operator_namespace, &secret_name)
.await?;
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("UpdateDefaultOkdIngress")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl UpdateDefaultOkdIngressInterpret {
async fn ensure_ingress_operator(
&self,
client: &Arc<K8sClient>,
operator_name: &str,
operator_namespace: &str,
) -> Result<Outcome, InterpretError> {
client
.ensure_deployment(operator_name, Some(operator_namespace))
.await?
}
fn open_path(&self, path: Path) -> Result<String, InterpretError> {
let mut file = match File::open(&path) {
Ok(file) => file,
Err(e) => InterpretError::new(format!("Could not open file {}", e)),
};
let s = String::new();
match file.read_to_string(&mut s) {
Ok(s) => Ok(s),
Err(e) => InterpretError::new(format!("Could not read file {}", e)),
}
}
async fn create_ca_cm(
&self,
client: &Arc<K8sClient>,
path_to_ca_cert: Path,
ca_name: &str,
) -> Result<Outcome, InterpretError> {
let ca_bundle = BASE64_STANDARD.encode(self.open_path(path_to_ca_cert).unwrap().as_bytes());
let cm = format!(
r"#
apiVersion: v1
kind: ConfigMap
metadata:
name: custom-ca
namespace: openshift-config
data:
ca-bundle.crt: {ca_bundle}
#"
);
client.apply_yaml(serde_yaml::to_value(&cm), Some("openshift-config")).await?;
Ok(Outcome::success(format!(
"successfully created cm : {} in openshift-config namespace",
ca_name
)))
}
async fn patch_proxy(
&self,
client: &Arc<K8sClient>,
ca_name: &str,
) -> Result<Outcome, InterpretError> {
let gvk = GroupVersionKind {
group: "config.openshift.io".to_string(),
version: "v1".to_string(),
kind: "Proxy".to_string(),
};
let patch = json!({
"spec": {
"trustedCA": {
"name": ca_name
}
}
});
client
.patch_resource_by_merge("cluster", None, &gvk, patch)
.await?;
Ok(Outcome::success(format!(
"successfully merged trusted ca to cluster proxy"
)))
}
async fn create_tls_secret(
&self,
client: &Arc<K8sClient>,
tls_crt: Path,
tls_key: Path,
operator_namespace: &str,
secret_name: &str,
) -> Result<Outcome, InterpretError> {
let base64_tls_crt = BASE64_STANDARD.encode(self.open_path(tls_crt).unwrap().as_bytes());
let base64_tls_key = BASE64_STANDARD.encode(self.open_path(tls_key).unwrap().as_bytes());
let secret = format!(
r#"
apiVersion: v1
kind: Secret
metadata:
name: secret-tls
namespace: {operator_namespace}
type: kubernetes.io/tls
data:
# values are base64 encoded, which obscures them but does NOT provide
# any useful level of confidentiality
# Replace the following values with your own base64-encoded certificate and key.
tls.crt: "{base64_tls_crt}"
tls.key: "{base64_tls_key}"
"#
);
client
.apply_yaml(serde_yaml::to_value(secret), Some(operator_namespace))
.await?;
Ok(Outcome::success(format!(
"successfully created tls secret trusted ca to cluster proxy"
)))
}
async fn patch_ingress(
&self,
client: &Arc<K8sClient>,
operator_namespace: &str,
secret_name: &str,
) -> Result<Outcome, InterpretError> {
let gvk = GroupVersionKind {
group: "operator.openshift.io".to_string(),
version: "v1".to_string(),
kind: "IngressController".to_string(),
};
let patch = json!(
{"spec":{"defaultCertificate": {"name": secret_name}}});
client
.patch_resource_by_merge("default", Some(operator_namespace), &gvk, patch)
.await?;
Ok(Outcome::success(format!("successfully pathed ingress operator to use secret {}", secret_name)))
}
}

View File

@@ -4,4 +4,5 @@ pub mod application_monitoring;
pub mod grafana;
pub mod kube_prometheus;
pub mod ntfy;
pub mod okd;
pub mod prometheus;

View File

@@ -0,0 +1,149 @@
use std::{collections::BTreeMap, sync::Arc};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
use async_trait::async_trait;
use harmony_types::id::Id;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::api::ObjectMeta;
use serde::Serialize;
#[derive(Clone, Debug, Serialize)]
pub struct OpenshiftUserWorkloadMonitoring {}
impl<T: Topology + K8sclient> Score<T> for OpenshiftUserWorkloadMonitoring {
fn name(&self) -> String {
"OpenshiftUserWorkloadMonitoringScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(OpenshiftUserWorkloadMonitoringInterpret {})
}
}
#[derive(Clone, Debug, Serialize)]
pub struct OpenshiftUserWorkloadMonitoringInterpret {}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for OpenshiftUserWorkloadMonitoringInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap();
self.update_cluster_monitoring_config_cm(&client).await?;
self.update_user_workload_monitoring_config_cm(&client)
.await?;
self.verify_user_workload(&client).await?;
Ok(Outcome::success(
"successfully enabled user-workload-monitoring".to_string(),
))
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OpenshiftUserWorkloadMonitoring")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl OpenshiftUserWorkloadMonitoringInterpret {
pub async fn update_cluster_monitoring_config_cm(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let mut data = BTreeMap::new();
data.insert(
"config.yaml".to_string(),
r#"
enableUserWorkload: true
alertmanagerMain:
enableUserAlertmanagerConfig: true
"#
.to_string(),
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("cluster-monitoring-config".to_string()),
namespace: Some("openshift-monitoring".to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
client.apply(&cm, Some("openshift-monitoring")).await?;
Ok(Outcome::success(
"updated cluster-monitoring-config-map".to_string(),
))
}
pub async fn update_user_workload_monitoring_config_cm(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let mut data = BTreeMap::new();
data.insert(
"config.yaml".to_string(),
r#"
alertmanager:
enabled: true
enableAlertmanagerConfig: true
"#
.to_string(),
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("user-workload-monitoring-config".to_string()),
namespace: Some("openshift-user-workload-monitoring".to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
client
.apply(&cm, Some("openshift-user-workload-monitoring"))
.await?;
Ok(Outcome::success(
"updated openshift-user-monitoring-config-map".to_string(),
))
}
pub async fn verify_user_workload(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let namespace = "openshift-user-workload-monitoring";
let alertmanager_name = "alertmanager-user-workload-0";
let prometheus_name = "prometheus-user-workload-0";
client
.wait_for_pod_ready(alertmanager_name, Some(namespace))
.await?;
client
.wait_for_pod_ready(prometheus_name, Some(namespace))
.await?;
Ok(Outcome::success(format!(
"pods: {}, {} ready in ns: {}",
alertmanager_name, prometheus_name, namespace
)))
}
}

View File

@@ -0,0 +1 @@
pub mod enable_user_workload;