Compare commits

...

11 Commits

Author SHA1 Message Date
82d1f87ff8 fix: stop swallowing non-404 errors in ResourceBundle::delete
Previously all errors were silently discarded when deleting bundle
resources. Now only 404 (Not Found) is ignored; other errors propagate.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 00:09:28 -04:00
9a67bcc96f Merge pull request 'fix/cnpgInstallation' (#251) from fix/cnpgInstallation into master
Some checks failed
Run Check Script / check (push) Successful in 1m45s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 2m15s
Reviewed-on: #251
2026-03-20 21:02:53 +00:00
a377fc1404 Merge branch 'master' into fix/cnpgInstallation
All checks were successful
Run Check Script / check (pull_request) Successful in 1m44s
2026-03-20 20:56:30 +00:00
c9977fee12 fix: CI file moved
All checks were successful
Run Check Script / check (pull_request) Successful in 2m5s
2026-03-20 16:48:38 -04:00
64bf585e07 fix: remove check.sh with broken path handling and fix formatting
Some checks failed
Run Check Script / check (pull_request) Failing after 12s
2026-03-20 16:41:30 -04:00
44e2c45435 fix: flaky tests due to bad environment variables handling in harmony_config crate 2026-03-20 16:40:08 -04:00
cdccbc8939 fix: formatting and minor stuff 2026-03-20 16:34:48 -04:00
9830971d05 feat: Creat namespace in k8s client and wait for namespace ready utility functions 2026-03-20 16:15:51 -04:00
e1183ef6de feat: K8s postgresql score now ensures cnpg is installed 2026-03-20 07:02:26 -04:00
8499f4d1b7 Merge pull request 'fix: small details were preventing to re-save frontends,backends and healthchecks in opnsense UI' (#248) from fix/load-balancer-xml into master
Some checks failed
Run Check Script / check (push) Has been cancelled
Compile and package harmony_composer / package_harmony_composer (push) Has been cancelled
Reviewed-on: #248
2026-03-17 14:38:35 +00:00
67c3265286 fix: small details were preventing to re-save frontends,backends and healthchecks in opnsense UI
All checks were successful
Run Check Script / check (pull_request) Successful in 2m12s
2026-03-13 10:31:17 -04:00
14 changed files with 367 additions and 95 deletions

View File

@@ -15,4 +15,4 @@ jobs:
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Run check script - name: Run check script
run: bash check.sh run: bash build/check.sh

View File

@@ -1 +0,0 @@
build/check.sh

View File

@@ -52,7 +52,7 @@
//! } //! }
//! ``` //! ```
use kube::{Error, Resource, ResourceExt, api::DynamicObject}; use kube::{Error, Resource, ResourceExt, api::DynamicObject, core::ErrorResponse};
use serde::Serialize; use serde::Serialize;
use serde_json; use serde_json;
@@ -117,16 +117,13 @@ impl ResourceBundle {
/// Delete all resources in this bundle from the cluster. /// Delete all resources in this bundle from the cluster.
/// Resources are deleted in reverse order to respect dependencies. /// Resources are deleted in reverse order to respect dependencies.
pub async fn delete(&self, client: &K8sClient) -> Result<(), Error> { pub async fn delete(&self, client: &K8sClient) -> Result<(), Error> {
// FIXME delete all in parallel and retry using kube::client::retry::RetryPolicy
for res in self.resources.iter().rev() { for res in self.resources.iter().rev() {
let api = client.get_api_for_dynamic_object(res, res.namespace().as_deref())?; let api = client.get_api_for_dynamic_object(res, res.namespace().as_deref())?;
let name = res.name_any(); let name = res.name_any();
// FIXME this swallows all errors. Swallowing a 404 is ok but other errors must be match api.delete(&name, &kube::api::DeleteParams::default()).await {
// handled properly (such as retrying). A normal error case is when we delete a Ok(_) | Err(Error::Api(ErrorResponse { code: 404, .. })) => {}
// resource bundle with dependencies between various resources. Such as a pod with a Err(e) => return Err(e),
// dependency on a ClusterRoleBinding. Trying to delete the ClusterRoleBinding first }
// is expected to fail
let _ = api.delete(&name, &kube::api::DeleteParams::default()).await;
} }
Ok(()) Ok(())
} }

View File

@@ -2,13 +2,14 @@ use std::collections::HashMap;
use k8s_openapi::api::{ use k8s_openapi::api::{
apps::v1::Deployment, apps::v1::Deployment,
core::v1::{Node, ServiceAccount}, core::v1::{Namespace, Node, ServiceAccount},
}; };
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition; use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use kube::api::ApiResource; use kube::api::ApiResource;
use kube::{ use kube::{
Error, Resource, Error, Resource,
api::{Api, DynamicObject, GroupVersionKind, ListParams, ObjectList}, api::{Api, DynamicObject, GroupVersionKind, ListParams, ObjectList},
core::ErrorResponse,
runtime::conditions, runtime::conditions,
runtime::wait::await_condition, runtime::wait::await_condition,
}; };
@@ -313,4 +314,65 @@ impl K8sClient {
) -> Result<ObjectList<Node>, Error> { ) -> Result<ObjectList<Node>, Error> {
self.list_resources(None, list_params).await self.list_resources(None, list_params).await
} }
pub async fn namespace_exists(&self, name: &str) -> Result<bool, Error> {
let api: Api<Namespace> = Api::all(self.client.clone());
match api.get_opt(name).await? {
Some(_) => Ok(true),
None => Ok(false),
}
}
pub async fn create_namespace(&self, name: &str) -> Result<Namespace, Error> {
let namespace = Namespace {
metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
..Default::default()
};
let api: Api<Namespace> = Api::all(self.client.clone());
api.create(&kube::api::PostParams::default(), &namespace)
.await
}
pub async fn wait_for_namespace(
&self,
name: &str,
timeout: Option<Duration>,
) -> Result<(), Error> {
let api: Api<Namespace> = Api::all(self.client.clone());
let timeout = timeout.unwrap_or(Duration::from_secs(60));
let start = std::time::Instant::now();
loop {
if start.elapsed() > timeout {
return Err(Error::Api(ErrorResponse {
status: "Timeout".to_string(),
message: format!("Namespace '{}' not ready within timeout", name),
reason: "Timeout".to_string(),
code: 408,
}));
}
match api.get_opt(name).await? {
Some(ns) => {
if let Some(status) = ns.status {
if status.phase == Some("Active".to_string()) {
return Ok(());
}
}
}
None => {
return Err(Error::Api(ErrorResponse {
status: "NotFound".to_string(),
message: format!("Namespace '{}' not found", name),
reason: "NotFound".to_string(),
code: 404,
}));
}
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
} }

View File

@@ -267,10 +267,16 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
SSL::Default => "".into(), SSL::Default => "".into(),
SSL::Other(other) => other.as_str().into(), SSL::Other(other) => other.as_str().into(),
}; };
let path_without_query = path.split_once('?').map_or(path.as_str(), |(p, _)| p);
let (port, port_name) = match port {
Some(port) => (Some(port.to_string()), port.to_string()),
None => (None, "serverport".to_string()),
};
let haproxy_check = HAProxyHealthCheck { let haproxy_check = HAProxyHealthCheck {
name: format!("HTTP_{http_method}_{path}"), name: format!("HTTP_{http_method}_{path_without_query}_{port_name}"),
uuid: Uuid::new_v4().to_string(), uuid: Uuid::new_v4().to_string(),
http_method: http_method.to_string().into(), http_method: http_method.to_string().to_lowercase().into(),
health_check_type: "http".to_string(), health_check_type: "http".to_string(),
http_uri: path.clone().into(), http_uri: path.clone().into(),
interval: "2s".to_string(), interval: "2s".to_string(),
@@ -314,7 +320,10 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
let mut backend = HAProxyBackend { let mut backend = HAProxyBackend {
uuid: Uuid::new_v4().to_string(), uuid: Uuid::new_v4().to_string(),
enabled: 1, enabled: 1,
name: format!("backend_{}", service.listening_port), name: format!(
"backend_{}",
service.listening_port.to_string().replace(':', "_")
),
algorithm: "roundrobin".to_string(), algorithm: "roundrobin".to_string(),
random_draws: Some(2), random_draws: Some(2),
stickiness_expire: "30m".to_string(), stickiness_expire: "30m".to_string(),
@@ -346,10 +355,22 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
let frontend = Frontend { let frontend = Frontend {
uuid: uuid::Uuid::new_v4().to_string(), uuid: uuid::Uuid::new_v4().to_string(),
enabled: 1, enabled: 1,
name: format!("frontend_{}", service.listening_port), name: format!(
"frontend_{}",
service.listening_port.to_string().replace(':', "_")
),
bind: service.listening_port.to_string(), bind: service.listening_port.to_string(),
mode: "tcp".to_string(), // TODO do not depend on health check here mode: "tcp".to_string(), // TODO do not depend on health check here
default_backend: Some(backend.uuid.clone()), default_backend: Some(backend.uuid.clone()),
stickiness_expire: "30m".to_string().into(),
stickiness_size: "50k".to_string().into(),
stickiness_conn_rate_period: "10s".to_string().into(),
stickiness_sess_rate_period: "10s".to_string().into(),
stickiness_http_req_rate_period: "10s".to_string().into(),
stickiness_http_err_rate_period: "10s".to_string().into(),
stickiness_bytes_in_rate_period: "1m".to_string().into(),
stickiness_bytes_out_rate_period: "1m".to_string().into(),
ssl_hsts_max_age: 15768000,
..Default::default() ..Default::default()
}; };
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp"); info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");

View File

@@ -1,11 +1,15 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use serde::Serialize; use serde::Serialize;
use std::str::FromStr;
use non_blank_string_rs::NonBlankString;
use crate::interpret::Interpret; use crate::interpret::Interpret;
use crate::modules::helm::chart::HelmChartScore;
use crate::modules::k8s::apps::crd::{Subscription, SubscriptionSpec}; use crate::modules::k8s::apps::crd::{Subscription, SubscriptionSpec};
use crate::modules::k8s::resource::K8sResourceScore; use crate::modules::k8s::resource::K8sResourceScore;
use crate::score::Score; use crate::score::Score;
use crate::topology::{K8sclient, Topology}; use crate::topology::{HelmCommand, K8sclient, Topology};
/// Install the CloudNativePg (CNPG) Operator via an OperatorHub `Subscription`. /// Install the CloudNativePg (CNPG) Operator via an OperatorHub `Subscription`.
/// ///
@@ -100,3 +104,41 @@ impl<T: Topology + K8sclient> Score<T> for CloudNativePgOperatorScore {
format!("CloudNativePgOperatorScore({})", self.namespace) format!("CloudNativePgOperatorScore({})", self.namespace)
} }
} }
#[derive(Debug, Clone, Serialize)]
pub struct CloudNativePgOperatorHelmScore {
pub namespace: String,
}
impl Default for CloudNativePgOperatorHelmScore {
fn default() -> Self {
Self {
namespace: "cnpg-system".to_string(),
}
}
}
impl<T: Topology + K8sclient + HelmCommand + 'static> Score<T> for CloudNativePgOperatorHelmScore {
fn name(&self) -> String {
format!("CloudNativePgOperatorHelmScore({})", self.namespace)
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let cnpg_helm_score = HelmChartScore {
namespace: Some(NonBlankString::from_str(&self.namespace).unwrap()),
release_name: NonBlankString::from_str("cloudnative-pg").unwrap(),
chart_name: NonBlankString::from_str(
"oci://ghcr.io/cloudnative-pg/charts/cloudnative-pg",
)
.unwrap(),
chart_version: None,
values_overrides: None,
values_yaml: None,
create_namespace: true,
install_only: true,
repository: None,
};
cnpg_helm_score.create_interpret()
}
}

View File

@@ -1,24 +1,35 @@
use crate::data::Version; use crate::data::Version;
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}; use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory; use crate::inventory::Inventory;
use crate::modules::k8s::resource::K8sResourceScore; use crate::modules::k8s::resource::K8sResourceScore;
use crate::modules::postgresql::capability::PostgreSQLConfig; use crate::modules::postgresql::capability::PostgreSQLConfig;
use crate::modules::postgresql::cnpg::{ use crate::modules::postgresql::cnpg::{
Bootstrap, Cluster, ClusterSpec, ExternalCluster, Initdb, PgBaseBackup, ReplicaSpec, Bootstrap, Cluster, ClusterSpec, ExternalCluster, Initdb, PgBaseBackup, ReplicaSpec,
SecretKeySelector, Storage, SecretKeySelector, Storage,
}; };
use crate::modules::postgresql::operator::{
CloudNativePgOperatorHelmScore, CloudNativePgOperatorScore,
};
use crate::score::Score; use crate::score::Score;
use crate::topology::{K8sclient, Topology}; use crate::topology::{HelmCommand, K8sclient, Topology};
use async_trait::async_trait; use async_trait::async_trait;
use harmony_k8s::KubernetesDistribution;
use harmony_types::id::Id; use harmony_types::id::Id;
use k8s_openapi::ByteString; use k8s_openapi::ByteString;
use k8s_openapi::api::core::v1::Secret; use k8s_openapi::api::core::v1::{Pod, Secret};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use log::{info, warn};
use serde::Serialize; use serde::Serialize;
/// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG. /// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG.
/// ///
/// This score automatically ensures the CloudNativePG (CNPG) operator is installed
/// before creating the Cluster CRD. The installation method depends on the Kubernetes
/// distribution:
///
/// - **OpenShift/OKD**: Uses OperatorHub Subscription via `CloudNativePgOperatorScore`
/// - **K3s/Other**: Uses Helm chart via `CloudNativePgOperatorHelmScore`
///
/// # Usage /// # Usage
/// ``` /// ```
/// use harmony::modules::postgresql::PostgreSQLScore; /// use harmony::modules::postgresql::PostgreSQLScore;
@@ -26,12 +37,7 @@ use serde::Serialize;
/// ``` /// ```
/// ///
/// # Limitations (Happy Path) /// # Limitations (Happy Path)
/// - Requires CNPG operator installed (use CloudNativePgOperatorScore).
/// - No backups, monitoring, extensions configured. /// - No backups, monitoring, extensions configured.
///
/// TODO : refactor this to declare a clean dependency on cnpg operator. Then cnpg operator will
/// self-deploy either using operatorhub or helm chart depending on k8s flavor. This is cnpg
/// specific behavior
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct K8sPostgreSQLScore { pub struct K8sPostgreSQLScore {
pub config: PostgreSQLConfig, pub config: PostgreSQLConfig,
@@ -56,7 +62,7 @@ impl K8sPostgreSQLScore {
} }
} }
impl<T: Topology + K8sclient> Score<T> for K8sPostgreSQLScore { impl<T: Topology + K8sclient + HelmCommand + 'static> Score<T> for K8sPostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> { fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(K8sPostgreSQLInterpret { Box::new(K8sPostgreSQLInterpret {
config: self.config.clone(), config: self.config.clone(),
@@ -73,13 +79,127 @@ pub struct K8sPostgreSQLInterpret {
config: PostgreSQLConfig, config: PostgreSQLConfig,
} }
impl K8sPostgreSQLInterpret {
async fn ensure_namespace<T: Topology + K8sclient>(
&self,
topology: &T,
) -> Result<(), InterpretError> {
let k8s_client = topology
.k8s_client()
.await
.map_err(|e| InterpretError::new(format!("Failed to get k8s client: {}", e)))?;
let namespace_name = &self.config.namespace;
if k8s_client
.namespace_exists(namespace_name)
.await
.map_err(|e| {
InterpretError::new(format!(
"Failed to check namespace '{}': {}",
namespace_name, e
))
})?
{
info!("Namespace '{}' already exists", namespace_name);
return Ok(());
}
info!("Creating namespace '{}'", namespace_name);
k8s_client
.create_namespace(namespace_name)
.await
.map_err(|e| {
InterpretError::new(format!(
"Failed to create namespace '{}': {}",
namespace_name, e
))
})?;
k8s_client
.wait_for_namespace(namespace_name, Some(std::time::Duration::from_secs(30)))
.await
.map_err(|e| {
InterpretError::new(format!("Namespace '{}' not ready: {}", namespace_name, e))
})?;
info!("Namespace '{}' is ready", namespace_name);
Ok(())
}
async fn ensure_cnpg_operator<T: Topology + K8sclient + HelmCommand + 'static>(
&self,
topology: &T,
) -> Result<(), InterpretError> {
let k8s_client = topology
.k8s_client()
.await
.map_err(|e| InterpretError::new(format!("Failed to get k8s client: {}", e)))?;
let pods = k8s_client
.list_all_resources_with_labels::<Pod>("app.kubernetes.io/name=cloudnative-pg")
.await
.map_err(|e| {
InterpretError::new(format!("Failed to list CNPG operator pods: {}", e))
})?;
if !pods.is_empty() {
info!("CNPG operator is already installed");
return Ok(());
}
warn!("CNPG operator not found, installing...");
let distro = k8s_client.get_k8s_distribution().await.map_err(|e| {
InterpretError::new(format!("Failed to detect k8s distribution: {}", e))
})?;
match distro {
KubernetesDistribution::OpenshiftFamily => {
info!("Installing CNPG operator via OperatorHub Subscription");
let score = CloudNativePgOperatorScore::default_openshift();
score
.interpret(&Inventory::empty(), topology)
.await
.map_err(|e| {
InterpretError::new(format!("Failed to install CNPG operator: {}", e))
})?;
}
KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => {
info!("Installing CNPG operator via Helm chart");
let score = CloudNativePgOperatorHelmScore::default();
score
.interpret(&Inventory::empty(), topology)
.await
.map_err(|e| {
InterpretError::new(format!("Failed to install CNPG operator: {}", e))
})?;
}
}
k8s_client
.wait_until_deployment_ready(
"cloudnative-pg",
Some("cnpg-system"),
Some(std::time::Duration::from_secs(120)),
)
.await
.map_err(|e| InterpretError::new(format!("CNPG operator not ready: {}", e)))?;
info!("CNPG operator is ready");
Ok(())
}
}
#[async_trait] #[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for K8sPostgreSQLInterpret { impl<T: Topology + K8sclient + HelmCommand + 'static> Interpret<T> for K8sPostgreSQLInterpret {
async fn execute( async fn execute(
&self, &self,
inventory: &Inventory, inventory: &Inventory,
topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
self.ensure_cnpg_operator(topology).await?;
self.ensure_namespace(topology).await?;
match &self.config.role { match &self.config.role {
super::capability::PostgreSQLClusterRole::Primary => { super::capability::PostgreSQLClusterRole::Primary => {
let metadata = ObjectMeta { let metadata = ObjectMeta {

View File

@@ -5,7 +5,7 @@ use directories::ProjectDirs;
use interactive_parse::InteractiveParseObj; use interactive_parse::InteractiveParseObj;
use log::debug; use log::debug;
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{de::DeserializeOwned, Serialize}; use serde::{Serialize, de::DeserializeOwned};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use thiserror::Error; use thiserror::Error;
@@ -76,11 +76,10 @@ impl ConfigManager {
pub async fn get<T: Config>(&self) -> Result<T, ConfigError> { pub async fn get<T: Config>(&self) -> Result<T, ConfigError> {
for source in &self.sources { for source in &self.sources {
if let Some(value) = source.get(T::KEY).await? { if let Some(value) = source.get(T::KEY).await? {
let config: T = serde_json::from_value(value).map_err(|e| { let config: T =
ConfigError::Deserialization { serde_json::from_value(value).map_err(|e| ConfigError::Deserialization {
key: T::KEY.to_string(), key: T::KEY.to_string(),
source: e, source: e,
}
})?; })?;
debug!("Retrieved config for key {} from source", T::KEY); debug!("Retrieved config for key {} from source", T::KEY);
return Ok(config); return Ok(config);
@@ -95,17 +94,20 @@ impl ConfigManager {
match self.get::<T>().await { match self.get::<T>().await {
Ok(config) => Ok(config), Ok(config) => Ok(config),
Err(ConfigError::NotFound { .. }) => { Err(ConfigError::NotFound { .. }) => {
let config = T::parse_to_obj() let config =
.map_err(|e| ConfigError::PromptError(e.to_string()))?; T::parse_to_obj().map_err(|e| ConfigError::PromptError(e.to_string()))?;
for source in &self.sources { for source in &self.sources {
if let Err(e) = source if let Err(e) = source
.set(T::KEY, &serde_json::to_value(&config).map_err(|e| { .set(
T::KEY,
&serde_json::to_value(&config).map_err(|e| {
ConfigError::Serialization { ConfigError::Serialization {
key: T::KEY.to_string(), key: T::KEY.to_string(),
source: e, source: e,
} }
})?) })?,
)
.await .await
{ {
debug!("Failed to save config to source: {e}"); debug!("Failed to save config to source: {e}");
@@ -175,8 +177,35 @@ mod tests {
use super::*; use super::*;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
static ENV_LOCK: Mutex<()> = Mutex::new(());
fn setup_env_vars(key: &str, value: Option<&str>) -> String {
let id = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
let env_var = format!("HARMONY_CONFIG_{}_{}", key, id);
unsafe {
if let Some(v) = value {
std::env::set_var(&env_var, v);
} else {
std::env::remove_var(&env_var);
}
}
env_var
}
fn run_in_isolated_env<F>(f: F)
where
F: FnOnce() + Send + 'static,
{
let handle = std::thread::spawn(f);
handle.join().expect("Test thread panicked");
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct TestConfig { struct TestConfig {
name: String, name: String,
@@ -339,18 +368,14 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_env_source_reads_from_environment() { async fn test_env_source_reads_from_environment() {
unsafe { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
std::env::set_var( let env_var = setup_env_vars("TestConfig", Some(r#"{"name":"from_env","count":7}"#));
"HARMONY_CONFIG_TestConfig",
r#"{"name":"from_env","count":7}"#,
);
}
let source = EnvSource; let source = EnvSource;
let result = source.get("TestConfig").await; let result = source.get(&env_var.replace("HARMONY_CONFIG_", "")).await;
unsafe { unsafe {
std::env::remove_var("HARMONY_CONFIG_TestConfig"); std::env::remove_var(&env_var);
} }
let value = result.unwrap().unwrap(); let value = result.unwrap().unwrap();
@@ -361,26 +386,32 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_env_source_returns_none_when_not_set() { async fn test_env_source_returns_none_when_not_set() {
unsafe { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
std::env::remove_var("HARMONY_CONFIG_TestConfig"); run_in_isolated_env(|| {
} let env_var = setup_env_vars("TestConfig", None);
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let source = EnvSource; let source = EnvSource;
let result = source.get("TestConfig").await.unwrap(); let result = source.get(&env_var.replace("HARMONY_CONFIG_", "")).await;
assert!(result.is_none()); assert!(result.unwrap().is_none());
});
});
} }
#[tokio::test] #[tokio::test]
async fn test_env_source_returns_error_for_invalid_json() { async fn test_env_source_returns_error_for_invalid_json() {
unsafe { let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
std::env::set_var("HARMONY_CONFIG_TestConfig", "not valid json"); let env_var = setup_env_vars("TestConfig", Some("not valid json"));
}
let source = EnvSource; let source = EnvSource;
let result = source.get("TestConfig").await; let result = source.get(&env_var.replace("HARMONY_CONFIG_", "")).await;
unsafe { unsafe {
std::env::remove_var("HARMONY_CONFIG_TestConfig"); std::env::remove_var(&env_var);
} }
assert!(result.is_err()); assert!(result.is_err());

View File

@@ -1,5 +1,5 @@
use async_trait::async_trait;
use crate::{ConfigError, ConfigSource}; use crate::{ConfigError, ConfigSource};
use async_trait::async_trait;
pub struct EnvSource; pub struct EnvSource;
@@ -13,14 +13,12 @@ impl ConfigSource for EnvSource {
let env_key = env_key_for(key); let env_key = env_key_for(key);
match std::env::var(&env_key) { match std::env::var(&env_key) {
Ok(value) => { Ok(value) => serde_json::from_str(&value).map(Some).map_err(|e| {
serde_json::from_str(&value) ConfigError::EnvError(format!(
.map(Some)
.map_err(|e| ConfigError::EnvError(format!(
"Invalid JSON in environment variable {}: {}", "Invalid JSON in environment variable {}: {}",
env_key, e env_key, e
))) ))
} }),
Err(std::env::VarError::NotPresent) => Ok(None), Err(std::env::VarError::NotPresent) => Ok(None),
Err(e) => Err(ConfigError::EnvError(format!( Err(e) => Err(ConfigError::EnvError(format!(
"Failed to read environment variable {}: {}", "Failed to read environment variable {}: {}",
@@ -31,8 +29,7 @@ impl ConfigSource for EnvSource {
async fn set(&self, key: &str, value: &serde_json::Value) -> Result<(), ConfigError> { async fn set(&self, key: &str, value: &serde_json::Value) -> Result<(), ConfigError> {
let env_key = env_key_for(key); let env_key = env_key_for(key);
let json_string = serde_json::to_string(value) let json_string = serde_json::to_string(value).map_err(|e| ConfigError::Serialization {
.map_err(|e| ConfigError::Serialization {
key: key.to_string(), key: key.to_string(),
source: e, source: e,
})?; })?;

View File

@@ -29,10 +29,11 @@ impl ConfigSource for LocalFileSource {
match fs::read(&path).await { match fs::read(&path).await {
Ok(contents) => { Ok(contents) => {
let value: serde_json::Value = serde_json::from_slice(&contents) let value: serde_json::Value = serde_json::from_slice(&contents).map_err(|e| {
.map_err(|e| ConfigError::Deserialization { ConfigError::Deserialization {
key: key.to_string(), key: key.to_string(),
source: e, source: e,
}
})?; })?;
Ok(Some(value)) Ok(Some(value))
} }
@@ -48,8 +49,8 @@ impl ConfigSource for LocalFileSource {
fs::create_dir_all(&self.base_path).await?; fs::create_dir_all(&self.base_path).await?;
let path = self.file_path_for(key); let path = self.file_path_for(key);
let contents = serde_json::to_string_pretty(value) let contents =
.map_err(|e| ConfigError::Serialization { serde_json::to_string_pretty(value).map_err(|e| ConfigError::Serialization {
key: key.to_string(), key: key.to_string(),
source: e, source: e,
})?; })?;

View File

@@ -18,7 +18,9 @@ impl PromptSource {
#[allow(dead_code)] #[allow(dead_code)]
pub fn with_writer(writer: Arc<dyn std::io::Write + Send + Sync>) -> Self { pub fn with_writer(writer: Arc<dyn std::io::Write + Send + Sync>) -> Self {
Self { writer: Some(writer) } Self {
writer: Some(writer),
}
} }
} }

View File

@@ -19,8 +19,8 @@ impl<S: SecretStore + 'static> ConfigSource for StoreSource<S> {
async fn get(&self, key: &str) -> Result<Option<serde_json::Value>, ConfigError> { async fn get(&self, key: &str) -> Result<Option<serde_json::Value>, ConfigError> {
match self.store.get_raw(&self.namespace, key).await { match self.store.get_raw(&self.namespace, key).await {
Ok(bytes) => { Ok(bytes) => {
let value: serde_json::Value = serde_json::from_slice(&bytes) let value: serde_json::Value =
.map_err(|e| ConfigError::Deserialization { serde_json::from_slice(&bytes).map_err(|e| ConfigError::Deserialization {
key: key.to_string(), key: key.to_string(),
source: e, source: e,
})?; })?;

View File

@@ -1,7 +1,7 @@
use proc_macro::TokenStream; use proc_macro::TokenStream;
use proc_macro_crate::{crate_name, FoundCrate}; use proc_macro_crate::{FoundCrate, crate_name};
use quote::quote; use quote::quote;
use syn::{parse_macro_input, DeriveInput, Ident}; use syn::{DeriveInput, Ident, parse_macro_input};
#[proc_macro_derive(Config)] #[proc_macro_derive(Config)]
pub fn derive_config(input: TokenStream) -> TokenStream { pub fn derive_config(input: TokenStream) -> TokenStream {