Compare commits

...

2 Commits

Author SHA1 Message Date
c6c53b1117 wip: created basic struct for separation natsjetstream config from nats 2026-02-11 11:09:11 -05:00
0b9499bc97 doc: adr 019 2026-02-06 15:32:50 -05:00
10 changed files with 370 additions and 27 deletions

View File

@@ -0,0 +1,86 @@
Initial Date: 2025-02-06
## Status
Proposed
## Context
The Harmony Agent requires a persistent connection to the NATS Supercluster to perform Key-Value (KV) operations (Read/Write/Watch).
Service Requirements: The agent must authenticate with sufficient privileges to manage KV buckets and interact with the JetStream API.
Infrastructure: NATS is deployed as a multi-site Supercluster. Authentication must be consistent across sites to allow for agent failover and data replication.
https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro
Technical Constraint: In NATS, JetStream functionality is not global by default; it must be explicitly enabled and capped at the Account level to allow KV bucket creation and persistence.
## Issues
1. The "System Account" Trap
The Hole: Using the system account for the Harmony Agent.
The Risk: The NATS System Account is for server heartbeat and monitoring. It cannot (and should not) own JetStream KV buckets.
2. Multi-Site Authorization Sync
The Hole: Defining users in local nats.conf files via Helm.
The Risk: If an agent at Site-2 fails over to Site-3, but Site-3s local configuration doesn't have the testUser credentials, the agent will be locked out during an outage.
3. KV Replication Factor
The Hole: Not specifying the Replicas count for the KV bucket.
The Risk: If you create a KV bucket with the default (1 replica), it only exists at the site where it was created. If that site goes down, the data is lost despite having a Supercluster.
4. Subject-Level Permissions
The Hole: Only granting TEST.* permissions.
The Risk: NATS KV uses internal subjects (e.g., $KV.<bucket_name>.>). Without access to these, the agent will get an "Authorization Violation" even if it's logged in.
## Proposed Solution
To enable reliable, secure communication between the Harmony Agent and the NATS Supercluster, we will implement Account-isolated JetStream using NKey Authentication (or mTLS).
1. Dedicated Account Architecture
We will move away from the "Global/Default" account. A dedicated HARMONY account will be defined identically across all sites in the Supercluster. This ensures that the metadata for the KV bucket can replicate across the gateways.
System Account: Reserved for NATS internal health and Supercluster routing.
Harmony Account: Dedicated to Harmony Agent data, with JetStream explicitly enabled.
2. Authentication: Use harmony secret store mounted into nats container
Take advantage of currently implemented solution
3. JetStream & KV Configuration
To ensure the KV bucket is available across the Supercluster, the following configuration must be applied:
Replication Factor (R=3): KV buckets will be created with a replication factor of 3 to ensure data persists across Site-1, Site-2, and Site-3.
Permissions: The agent will be granted scoped access to:
$KV.HARMONY.> (Data operations)
$JS.API.CONSUMER.> and $JS.API.STREAM.> (Management operations)
## Consequence of Decision
Pros
Resilience: Agents can fail over to any site in the Supercluster and find their credentials and data.
Security: By using a dedicated account, the Harmony Agent cannot see or interfere with NATS system traffic.
Scalability: We can add Site-4 or Site-5 simply by copying the HARMONY account definition.
Cons / Risks
Configuration Drift: If one site's ConfigMap is updated without the others, authentication will fail during a site failover.
Complexity: Requires a "Management" step to ensure the account exists on all NATS instances before the agent attempts to connect.

View File

@@ -151,7 +151,14 @@ impl TlsRouter for K8sAnywhereTopology {
}
}
KubernetesDistribution::K3sFamily => todo!(),
KubernetesDistribution::Default => todo!(),
KubernetesDistribution::Default => {
warn!(
"unimpleneted see example https://kubernetes.github.io/ingress-nginx/user-guide/cli-arguments/ and create ingress manually "
);
Ok(Some(
"This section is in todo and must be created manually".to_string(),
))
}
}
}

View File

@@ -3,8 +3,9 @@ use async_trait::async_trait;
use crate::{
inventory::Inventory,
modules::nats::{
capability::{Nats, NatsCluster},
score_nats_k8s::NatsK8sScore,
capability::{
JetstreamConfig, JetstreamUserConfig, Nats, NatsCluster, NatsCredentials, NatsJetstream,
}, score_nats_enable_jetstream::NatsK8sEnableJetstreamScore, score_nats_jetstream::NatsK8sJetstreamScore, score_nats_jetstream_credentials::NatsK8sJetstreamCredentialsScore, score_nats_k8s::NatsK8sScore
},
score::Score,
topology::K8sAnywhereTopology,
@@ -36,3 +37,31 @@ impl Nats for K8sAnywhereTopology {
))
}
}
#[async_trait]
impl NatsJetstream for K8sAnywhereTopology {
async fn enable_jetstream(&self, jetstream_config: &JetstreamConfig) -> Result<String, String> {
NatsK8sEnableJetstreamScore {}
.interpret(&Inventory::empty(), self)
.await
.map_err(|e| format!("Failed to enable jetstream: {}", e))?;
Ok("Nats jetstream enabled".to_string())
}
async fn create_jetstream_credentials(
&self,
jetstream_user_config: &JetstreamUserConfig,
) -> Result<String, String> {
NatsK8sJetstreamCredentialsScore {}
.interpret(&Inventory::empty(), self)
.await
.map_err(|e| format!("Failed to create credentials jetstream: {}", e))?;
Ok("Nats jetstream user credentials configured".to_string())
}
async fn get_jetstream_credentials(&self) -> Result<NatsCredentials, String> {
todo!()
}
}

View File

@@ -1,7 +1,9 @@
use std::fmt::Display;
use async_trait::async_trait;
use serde::Serialize;
use harmony_secret::Secret;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[async_trait]
pub trait Nats {
@@ -13,6 +15,19 @@ pub trait Nats {
) -> Result<String, String>;
}
//ADR nats credentials management ->
#[async_trait]
pub trait NatsJetstream: Nats {
async fn enable_jetstream(&self, jetstream_config: &JetstreamConfig) -> Result<String, String>;
async fn create_jetstream_credentials(
&self,
jetstream_user_config: &JetstreamUserConfig,
) -> Result<String, String>;
async fn get_jetstream_credentials(&self) -> Result<NatsCredentials, String>;
}
#[async_trait]
pub trait NatsSupercluster {
async fn deploy_site(
@@ -51,3 +66,26 @@ impl Display for NatsEndpoint {
write!(f, "({})", self.host,)
}
}
#[derive(Debug, Clone, Serialize)]
pub struct JetstreamConfig {
store_dir: String,
max_memory_store: u8,
max_file_store: u8,
domain: String,
max_buffered_msgs: u8,
max_buffered_size: u8,
request_qeue_limit: u8,
sync_interval: u8,
strict: bool,
unique_tag: String,
}
#[derive(Debug, Clone, Serialize)]
pub struct JetstreamUserConfig {}
#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq, Clone)]
pub struct NatsCredentials {
pub user: String,
pub password: String,
}

View File

@@ -3,3 +3,6 @@ pub mod decentralized;
pub mod pki;
pub mod score_nats_k8s;
pub mod score_nats_supercluster;
pub mod score_nats_jetstream;
pub mod score_nats_jetstream_credentials;
pub mod score_nats_enable_jetstream;

View File

@@ -63,7 +63,7 @@ where
ca_issuer_name.into(),
None,
Some(vec![cluster.dns_name.clone()]),
Some(false),
Some(true),
&root_ca_config,
)
.await

View File

@@ -0,0 +1,55 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use serde::Serialize;
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::nats::capability::NatsJetstream,
score::Score,
topology::Topology,
};
#[derive(Debug, Clone, Serialize)]
pub struct NatsK8sEnableJetstreamScore {}
impl<T: Topology > Score<T> for NatsK8sEnableJetstreamScore {
fn name(&self) -> String {
"NatsK8sEnableJetstreamScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(NatsK8sEnableJetstreamInterpet {})
}
}
#[derive(Debug)]
pub struct NatsK8sEnableJetstreamInterpet {}
#[async_trait]
impl<T: Topology> Interpret<T> for NatsK8sEnableJetstreamInterpet {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
todo!()
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("NatsK8sEnableJetstreamInterpet")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@@ -0,0 +1,69 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use log::info;
use serde::Serialize;
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::nats::capability::{JetstreamConfig, JetstreamUserConfig, NatsJetstream},
score::Score,
topology::Topology,
};
#[derive(Debug, Clone, Serialize)]
pub struct NatsK8sJetstreamScore {
jetstream_config: JetstreamConfig,
jetstream_user_config: JetstreamUserConfig,
}
impl<T: Topology + NatsJetstream> Score<T> for NatsK8sJetstreamScore {
fn name(&self) -> String {
"NatsK8sJetstreamScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(NatsK8sJetstreamInterpet {
score: self.clone(),
})
}
}
#[derive(Debug)]
pub struct NatsK8sJetstreamInterpet {
score: NatsK8sJetstreamScore,
}
#[async_trait]
impl<T: Topology + NatsJetstream> Interpret<T> for NatsK8sJetstreamInterpet {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!("enabling jetstream for nats cluster");
topology.enable_jetstream(&self.score.jetstream_config).await?;
info!("creating jetstream user credentials");
topology.create_jetstream_credentials(&self.score.jetstream_user_config).await?;
Ok(Outcome::success(
"Successfully enabled jetstream and created user credentials".to_string(),
))
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("NatsK8sJetstreamInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@@ -0,0 +1,54 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use serde::Serialize;
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::Topology,
};
#[derive(Debug, Clone, Serialize)]
pub struct NatsK8sJetstreamCredentialsScore {}
impl<T: Topology> Score<T> for NatsK8sJetstreamCredentialsScore {
fn name(&self) -> String {
"NatsK8sJetstreamCredentialsScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(NatsK8sJetstreamCredentialsInterpret {})
}
}
#[derive(Debug, Clone, Serialize)]
pub struct NatsK8sJetstreamCredentialsInterpret {}
#[async_trait]
impl<T: Topology> Interpret<T> for NatsK8sJetstreamCredentialsInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
todo!()
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("NatsK8sJetstreamCredentialsInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@@ -6,7 +6,7 @@ use harmony_secret::{Secret, SecretManager};
use harmony_types::id::Id;
use k8s_openapi::{ByteString, api::core::v1::Secret as K8sSecret};
use kube::api::ObjectMeta;
use log::{debug, info};
use log::{debug, info, warn};
use non_blank_string_rs::NonBlankString;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
@@ -18,7 +18,7 @@ use crate::{
modules::{
helm::chart::{HelmChartScore, HelmRepository},
k8s::{ingress::K8sIngressScore, resource::K8sResourceScore},
nats::capability::{Nats, NatsCluster, NatsEndpoint},
nats::capability::{Nats, NatsCluster, NatsCredentials, NatsEndpoint},
okd::{
crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig},
route::OKDRouteScore,
@@ -36,7 +36,7 @@ pub struct NatsK8sScore {
pub ca_bundle: Option<Vec<String>>,
}
impl<T: Topology + HelmCommand + Nats + K8sclient + TlsRouter + 'static> Score<T> for NatsK8sScore {
impl<T: Topology + HelmCommand + K8sclient + TlsRouter + 'static> Score<T> for NatsK8sScore {
fn name(&self) -> String {
"NatsK8sScore".to_string()
}
@@ -53,7 +53,7 @@ pub struct NatsK8sInterpret {
}
#[async_trait]
impl<T: Topology + HelmCommand + Nats + K8sclient + TlsRouter + 'static> Interpret<T>
impl<T: Topology + HelmCommand + K8sclient + TlsRouter + 'static> Interpret<T>
for NatsK8sInterpret
{
async fn execute(
@@ -153,19 +153,25 @@ impl NatsK8sInterpret {
.await
}
KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => {
warn!("
https://kubernetes.github.io/ingress-nginx/user-guide/tls/#ssl-passthrough
for now tls router s not impleneted for a standard kubernetes cluster. an ingress must be manually created with the annotation
nginx.ingress.kubernetes.io/ssl-passthrough: \"true\"
");
Ok(Outcome::noop("noop".to_string()))
//TODO untested
K8sIngressScore {
name: todo!(),
host: todo!(),
backend_service: todo!(),
port: todo!(),
path: todo!(),
path_type: todo!(),
namespace: todo!(),
ingress_class_name: todo!(),
}
.interpret(inventory, topology)
.await
// K8sIngressScore {
// name: todo!(),
// host: todo!(),
// backend_service: todo!(),
// port: todo!(),
// path: todo!(),
// path_type: todo!(),
// namespace: todo!(),
// ingress_class_name: todo!(),
// }
// .interpret(inventory, topology)
// .await
}
}
}
@@ -221,7 +227,7 @@ impl NatsK8sInterpret {
namespace: String,
) -> Result<Outcome, InterpretError> {
let mut gateway_gateways = String::new();
let admin = SecretManager::get_or_prompt::<NatsAdmin>().await.unwrap();
let admin = SecretManager::get_or_prompt::<NatsCredentials>().await.unwrap();
let user = admin.user.clone();
let password = admin.password.clone();
@@ -304,6 +310,7 @@ service:
ports:
gateway:
enabled: true
publishNotReadyAddresses: true
tlsCA:
enabled: true
secretName: {supercluster_ca_secret_name}
@@ -343,8 +350,3 @@ natsBox:
}
}
#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq, Clone)]
struct NatsAdmin {
user: String,
password: String,
}