feat: introduced crate tokio-retry to allow multiple attempts to get secret from k8s #225
@@ -79,6 +79,7 @@ sqlx.workspace = true
|
||||
inquire.workspace = true
|
||||
brocade = { path = "../brocade" }
|
||||
option-ext = "0.2.0"
|
||||
tokio-retry = "0.3.0"
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions.workspace = true
|
||||
|
||||
@@ -16,10 +16,7 @@ use kube::{
|
||||
};
|
||||
use log::{debug, info, trace, warn};
|
||||
use serde::Serialize;
|
||||
use tokio::{
|
||||
sync::OnceCell,
|
||||
time::{Instant, sleep},
|
||||
};
|
||||
use tokio::sync::OnceCell;
|
||||
|
||||
use crate::{
|
||||
executors::ExecutorError,
|
||||
@@ -513,45 +510,41 @@ impl CertificateManagement for K8sAnywhereTopology {
|
||||
{
|
||||
let secret_name = certificate.spec.secret_name.clone();
|
||||
|
||||
let deadline = Instant::now() + tokio::time::Duration::from_secs(120);
|
||||
debug!("Secret Name {:#?}", secret_name);
|
||||
if let Some(secret) = client
|
||||
.get_resource::<Secret>(&secret_name, Some(&namespace))
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ExecutorError::UnexpectedError(format!(
|
||||
"secret {} not found in namespace {}: {}",
|
||||
secret_name, namespace, e
|
||||
))
|
||||
})?
|
||||
{
|
||||
let ca_cert = secret
|
||||
.data
|
||||
.as_ref()
|
||||
.and_then(|d| d.get("ca.crt"))
|
||||
.ok_or_else(|| {
|
||||
ExecutorError::UnexpectedError("Secret missing key 'ca.crt'".into())
|
||||
})?;
|
||||
|
||||
loop {
|
||||
if Instant::now() > deadline {
|
||||
return Err(ExecutorError::UnexpectedError(format!(
|
||||
"Timed out waiting for Secret {} to contain ca.crt",
|
||||
secret_name
|
||||
)));
|
||||
}
|
||||
let ca_cert = String::from_utf8(ca_cert.0.clone()).map_err(|_| {
|
||||
ExecutorError::UnexpectedError("ca.crt is not valid UTF-8".into())
|
||||
})?;
|
||||
|
||||
debug!("Secret Name {:#?}", secret_name);
|
||||
|
||||
if let Some(secret) = client
|
||||
.get_resource::<Secret>(&secret_name, Some(&namespace))
|
||||
.await
|
||||
.map_err(|e| {
|
||||
ExecutorError::UnexpectedError(format!(
|
||||
"secret {} not found in namespace {}: {}",
|
||||
secret_name, namespace, e
|
||||
))
|
||||
})?
|
||||
{
|
||||
if let Some(ca_cert) = secret.data.as_ref().and_then(|d| d.get("ca.crt")) {
|
||||
let ca_cert = String::from_utf8(ca_cert.0.clone()).map_err(|_| {
|
||||
ExecutorError::UnexpectedError("ca.crt is not valid UTF-8".into())
|
||||
})?;
|
||||
|
||||
return Ok(ca_cert);
|
||||
}
|
||||
}
|
||||
|
||||
// Secret not ready yet → wait and retry
|
||||
sleep(Duration::from_millis(500)).await;
|
||||
return Ok(ca_cert);
|
||||
} else {
|
||||
Err(ExecutorError::UnexpectedError(format!(
|
||||
"Error getting secret associated with cert_name: {}, secret_name: {}",
|
||||
cert_name, secret_name
|
||||
)))
|
||||
}
|
||||
} else {
|
||||
Err(ExecutorError::UnexpectedError(format!(
|
||||
return Err(ExecutorError::UnexpectedError(format!(
|
||||
"Certificate {} not found in namespace {}",
|
||||
cert_name, namespace
|
||||
)))
|
||||
)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,8 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, info};
|
||||
use tokio_retry::{Retry, strategy::ExponentialBackoff};
|
||||
|
||||
use crate::{
|
||||
inventory::Inventory,
|
||||
@@ -190,10 +193,28 @@ impl Nats for K8sAnywhereTopology {
|
||||
nats_cluster.name
|
||||
);
|
||||
|
||||
let ca_cert = self
|
||||
.get_ca_certificate(root_ca_cert_name, &root_ca_config)
|
||||
.await
|
||||
.map_err(|e| format!("{e}"))?;
|
||||
let strategy = ExponentialBackoff::from_millis(250)
|
||||
.factor(2)
|
||||
.max_delay(Duration::from_millis(1000))
|
||||
.take(10);
|
||||
|
||||
let ca_cert = Retry::spawn(strategy, || async {
|
||||
log::debug!("Attempting CA cert fetch");
|
||||
|
||||
let res = self
|
||||
.get_ca_certificate(root_ca_cert_name.clone(), &root_ca_config)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(cert) => Ok(cert),
|
||||
Err(e) => {
|
||||
log::warn!("Retryable error: {:?}", e);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|e| format!("Retries exhausted: {:?}", e))?;
|
||||
|
||||
Ok(ca_cert)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user