feat: introduced crate tokio-retry to allow multiple attempts to get secret from k8s #225
@@ -79,6 +79,7 @@ sqlx.workspace = true
|
|||||||
inquire.workspace = true
|
inquire.workspace = true
|
||||||
brocade = { path = "../brocade" }
|
brocade = { path = "../brocade" }
|
||||||
option-ext = "0.2.0"
|
option-ext = "0.2.0"
|
||||||
|
tokio-retry = "0.3.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
pretty_assertions.workspace = true
|
pretty_assertions.workspace = true
|
||||||
|
|||||||
@@ -16,10 +16,7 @@ use kube::{
|
|||||||
};
|
};
|
||||||
use log::{debug, info, trace, warn};
|
use log::{debug, info, trace, warn};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tokio::{
|
use tokio::sync::OnceCell;
|
||||||
sync::OnceCell,
|
|
||||||
time::{Instant, sleep},
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
executors::ExecutorError,
|
executors::ExecutorError,
|
||||||
@@ -513,45 +510,41 @@ impl CertificateManagement for K8sAnywhereTopology {
|
|||||||
{
|
{
|
||||||
let secret_name = certificate.spec.secret_name.clone();
|
let secret_name = certificate.spec.secret_name.clone();
|
||||||
|
|
||||||
let deadline = Instant::now() + tokio::time::Duration::from_secs(120);
|
debug!("Secret Name {:#?}", secret_name);
|
||||||
|
if let Some(secret) = client
|
||||||
|
.get_resource::<Secret>(&secret_name, Some(&namespace))
|
||||||
|
.await
|
||||||
|
.map_err(|e| {
|
||||||
|
ExecutorError::UnexpectedError(format!(
|
||||||
|
"secret {} not found in namespace {}: {}",
|
||||||
|
secret_name, namespace, e
|
||||||
|
))
|
||||||
|
})?
|
||||||
|
{
|
||||||
|
let ca_cert = secret
|
||||||
|
.data
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|d| d.get("ca.crt"))
|
||||||
|
.ok_or_else(|| {
|
||||||
|
ExecutorError::UnexpectedError("Secret missing key 'ca.crt'".into())
|
||||||
|
})?;
|
||||||
|
|
||||||
loop {
|
let ca_cert = String::from_utf8(ca_cert.0.clone()).map_err(|_| {
|
||||||
if Instant::now() > deadline {
|
ExecutorError::UnexpectedError("ca.crt is not valid UTF-8".into())
|
||||||
return Err(ExecutorError::UnexpectedError(format!(
|
})?;
|
||||||
"Timed out waiting for Secret {} to contain ca.crt",
|
|
||||||
secret_name
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Secret Name {:#?}", secret_name);
|
return Ok(ca_cert);
|
||||||
|
} else {
|
||||||
if let Some(secret) = client
|
Err(ExecutorError::UnexpectedError(format!(
|
||||||
.get_resource::<Secret>(&secret_name, Some(&namespace))
|
"Error getting secret associated with cert_name: {}, secret_name: {}",
|
||||||
.await
|
cert_name, secret_name
|
||||||
.map_err(|e| {
|
)))
|
||||||
ExecutorError::UnexpectedError(format!(
|
|
||||||
"secret {} not found in namespace {}: {}",
|
|
||||||
secret_name, namespace, e
|
|
||||||
))
|
|
||||||
})?
|
|
||||||
{
|
|
||||||
if let Some(ca_cert) = secret.data.as_ref().and_then(|d| d.get("ca.crt")) {
|
|
||||||
let ca_cert = String::from_utf8(ca_cert.0.clone()).map_err(|_| {
|
|
||||||
ExecutorError::UnexpectedError("ca.crt is not valid UTF-8".into())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
return Ok(ca_cert);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Secret not ready yet → wait and retry
|
|
||||||
sleep(Duration::from_millis(500)).await;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
Err(ExecutorError::UnexpectedError(format!(
|
return Err(ExecutorError::UnexpectedError(format!(
|
||||||
"Certificate {} not found in namespace {}",
|
"Certificate {} not found in namespace {}",
|
||||||
cert_name, namespace
|
cert_name, namespace
|
||||||
)))
|
)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use log::{debug, info};
|
use log::{debug, info};
|
||||||
|
use tokio_retry::{Retry, strategy::ExponentialBackoff};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
inventory::Inventory,
|
inventory::Inventory,
|
||||||
@@ -190,10 +193,28 @@ impl Nats for K8sAnywhereTopology {
|
|||||||
nats_cluster.name
|
nats_cluster.name
|
||||||
);
|
);
|
||||||
|
|
||||||
let ca_cert = self
|
let strategy = ExponentialBackoff::from_millis(250)
|
||||||
.get_ca_certificate(root_ca_cert_name, &root_ca_config)
|
.factor(2)
|
||||||
.await
|
.max_delay(Duration::from_millis(1000))
|
||||||
.map_err(|e| format!("{e}"))?;
|
.take(10);
|
||||||
|
|
||||||
|
let ca_cert = Retry::spawn(strategy, || async {
|
||||||
|
log::debug!("Attempting CA cert fetch");
|
||||||
|
|
||||||
|
let res = self
|
||||||
|
.get_ca_certificate(root_ca_cert_name.clone(), &root_ca_config)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(cert) => Ok(cert),
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("Retryable error: {:?}", e);
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Retries exhausted: {:?}", e))?;
|
||||||
|
|
||||||
Ok(ca_cert)
|
Ok(ca_cert)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user