diff --git a/harmony/Cargo.toml b/harmony/Cargo.toml index 634cbe96..a1a6e7b3 100644 --- a/harmony/Cargo.toml +++ b/harmony/Cargo.toml @@ -79,6 +79,7 @@ sqlx.workspace = true inquire.workspace = true brocade = { path = "../brocade" } option-ext = "0.2.0" +tokio-retry = "0.3.0" [dev-dependencies] pretty_assertions.workspace = true diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index 97d02e8c..ee705e15 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -16,10 +16,7 @@ use kube::{ }; use log::{debug, info, trace, warn}; use serde::Serialize; -use tokio::{ - sync::OnceCell, - time::{Instant, sleep}, -}; +use tokio::sync::OnceCell; use crate::{ executors::ExecutorError, @@ -513,45 +510,41 @@ impl CertificateManagement for K8sAnywhereTopology { { let secret_name = certificate.spec.secret_name.clone(); - let deadline = Instant::now() + tokio::time::Duration::from_secs(120); + debug!("Secret Name {:#?}", secret_name); + if let Some(secret) = client + .get_resource::(&secret_name, Some(&namespace)) + .await + .map_err(|e| { + ExecutorError::UnexpectedError(format!( + "secret {} not found in namespace {}: {}", + secret_name, namespace, e + )) + })? + { + let ca_cert = secret + .data + .as_ref() + .and_then(|d| d.get("ca.crt")) + .ok_or_else(|| { + ExecutorError::UnexpectedError("Secret missing key 'ca.crt'".into()) + })?; - loop { - if Instant::now() > deadline { - return Err(ExecutorError::UnexpectedError(format!( - "Timed out waiting for Secret {} to contain ca.crt", - secret_name - ))); - } + let ca_cert = String::from_utf8(ca_cert.0.clone()).map_err(|_| { + ExecutorError::UnexpectedError("ca.crt is not valid UTF-8".into()) + })?; - debug!("Secret Name {:#?}", secret_name); - - if let Some(secret) = client - .get_resource::(&secret_name, Some(&namespace)) - .await - .map_err(|e| { - ExecutorError::UnexpectedError(format!( - "secret {} not found in namespace {}: {}", - secret_name, namespace, e - )) - })? - { - if let Some(ca_cert) = secret.data.as_ref().and_then(|d| d.get("ca.crt")) { - let ca_cert = String::from_utf8(ca_cert.0.clone()).map_err(|_| { - ExecutorError::UnexpectedError("ca.crt is not valid UTF-8".into()) - })?; - - return Ok(ca_cert); - } - } - - // Secret not ready yet → wait and retry - sleep(Duration::from_millis(500)).await; + return Ok(ca_cert); + } else { + Err(ExecutorError::UnexpectedError(format!( + "Error getting secret associated with cert_name: {}, secret_name: {}", + cert_name, secret_name + ))) } } else { - Err(ExecutorError::UnexpectedError(format!( + return Err(ExecutorError::UnexpectedError(format!( "Certificate {} not found in namespace {}", cert_name, namespace - ))) + ))); } } } diff --git a/harmony/src/domain/topology/k8s_anywhere/nats.rs b/harmony/src/domain/topology/k8s_anywhere/nats.rs index 3fcba127..7e1f24cb 100644 --- a/harmony/src/domain/topology/k8s_anywhere/nats.rs +++ b/harmony/src/domain/topology/k8s_anywhere/nats.rs @@ -1,5 +1,8 @@ +use std::time::Duration; + use async_trait::async_trait; use log::{debug, info}; +use tokio_retry::{Retry, strategy::ExponentialBackoff}; use crate::{ inventory::Inventory, @@ -190,10 +193,28 @@ impl Nats for K8sAnywhereTopology { nats_cluster.name ); - let ca_cert = self - .get_ca_certificate(root_ca_cert_name, &root_ca_config) - .await - .map_err(|e| format!("{e}"))?; + let strategy = ExponentialBackoff::from_millis(250) + .factor(2) + .max_delay(Duration::from_millis(1000)) + .take(10); + + let ca_cert = Retry::spawn(strategy, || async { + log::debug!("Attempting CA cert fetch"); + + let res = self + .get_ca_certificate(root_ca_cert_name.clone(), &root_ca_config) + .await; + + match res { + Ok(cert) => Ok(cert), + Err(e) => { + log::warn!("Retryable error: {:?}", e); + Err(e) + } + } + }) + .await + .map_err(|e| format!("Retries exhausted: {:?}", e))?; Ok(ca_cert) }