Compare commits
2 Commits
master
...
feat/nats-
| Author | SHA1 | Date | |
|---|---|---|---|
| c6c53b1117 | |||
| 0b9499bc97 |
86
adr/019-Nats-credentials-management.md
Normal file
86
adr/019-Nats-credentials-management.md
Normal file
@@ -0,0 +1,86 @@
|
||||
Initial Date: 2025-02-06
|
||||
|
||||
## Status
|
||||
|
||||
Proposed
|
||||
|
||||
## Context
|
||||
|
||||
The Harmony Agent requires a persistent connection to the NATS Supercluster to perform Key-Value (KV) operations (Read/Write/Watch).
|
||||
|
||||
Service Requirements: The agent must authenticate with sufficient privileges to manage KV buckets and interact with the JetStream API.
|
||||
|
||||
Infrastructure: NATS is deployed as a multi-site Supercluster. Authentication must be consistent across sites to allow for agent failover and data replication.
|
||||
|
||||
https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro
|
||||
|
||||
Technical Constraint: In NATS, JetStream functionality is not global by default; it must be explicitly enabled and capped at the Account level to allow KV bucket creation and persistence.
|
||||
|
||||
## Issues
|
||||
|
||||
1. The "System Account" Trap
|
||||
|
||||
The Hole: Using the system account for the Harmony Agent.
|
||||
|
||||
The Risk: The NATS System Account is for server heartbeat and monitoring. It cannot (and should not) own JetStream KV buckets.
|
||||
|
||||
2. Multi-Site Authorization Sync
|
||||
|
||||
The Hole: Defining users in local nats.conf files via Helm.
|
||||
|
||||
The Risk: If an agent at Site-2 fails over to Site-3, but Site-3’s local configuration doesn't have the testUser credentials, the agent will be locked out during an outage.
|
||||
|
||||
3. KV Replication Factor
|
||||
|
||||
The Hole: Not specifying the Replicas count for the KV bucket.
|
||||
|
||||
The Risk: If you create a KV bucket with the default (1 replica), it only exists at the site where it was created. If that site goes down, the data is lost despite having a Supercluster.
|
||||
|
||||
4. Subject-Level Permissions
|
||||
|
||||
The Hole: Only granting TEST.* permissions.
|
||||
|
||||
The Risk: NATS KV uses internal subjects (e.g., $KV.<bucket_name>.>). Without access to these, the agent will get an "Authorization Violation" even if it's logged in.
|
||||
|
||||
|
||||
## Proposed Solution
|
||||
|
||||
To enable reliable, secure communication between the Harmony Agent and the NATS Supercluster, we will implement Account-isolated JetStream using NKey Authentication (or mTLS).
|
||||
1. Dedicated Account Architecture
|
||||
|
||||
We will move away from the "Global/Default" account. A dedicated HARMONY account will be defined identically across all sites in the Supercluster. This ensures that the metadata for the KV bucket can replicate across the gateways.
|
||||
|
||||
System Account: Reserved for NATS internal health and Supercluster routing.
|
||||
|
||||
Harmony Account: Dedicated to Harmony Agent data, with JetStream explicitly enabled.
|
||||
|
||||
2. Authentication: Use harmony secret store mounted into nats container
|
||||
|
||||
Take advantage of currently implemented solution
|
||||
|
||||
3. JetStream & KV Configuration
|
||||
|
||||
To ensure the KV bucket is available across the Supercluster, the following configuration must be applied:
|
||||
|
||||
Replication Factor (R=3): KV buckets will be created with a replication factor of 3 to ensure data persists across Site-1, Site-2, and Site-3.
|
||||
|
||||
Permissions: The agent will be granted scoped access to:
|
||||
|
||||
$KV.HARMONY.> (Data operations)
|
||||
|
||||
$JS.API.CONSUMER.> and $JS.API.STREAM.> (Management operations)
|
||||
|
||||
## Consequence of Decision
|
||||
Pros
|
||||
|
||||
Resilience: Agents can fail over to any site in the Supercluster and find their credentials and data.
|
||||
|
||||
Security: By using a dedicated account, the Harmony Agent cannot see or interfere with NATS system traffic.
|
||||
|
||||
Scalability: We can add Site-4 or Site-5 simply by copying the HARMONY account definition.
|
||||
|
||||
Cons / Risks
|
||||
|
||||
Configuration Drift: If one site's ConfigMap is updated without the others, authentication will fail during a site failover.
|
||||
|
||||
Complexity: Requires a "Management" step to ensure the account exists on all NATS instances before the agent attempts to connect.
|
||||
@@ -151,7 +151,14 @@ impl TlsRouter for K8sAnywhereTopology {
|
||||
}
|
||||
}
|
||||
KubernetesDistribution::K3sFamily => todo!(),
|
||||
KubernetesDistribution::Default => todo!(),
|
||||
KubernetesDistribution::Default => {
|
||||
warn!(
|
||||
"unimpleneted see example https://kubernetes.github.io/ingress-nginx/user-guide/cli-arguments/ and create ingress manually "
|
||||
);
|
||||
Ok(Some(
|
||||
"This section is in todo and must be created manually".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,8 +3,9 @@ use async_trait::async_trait;
|
||||
use crate::{
|
||||
inventory::Inventory,
|
||||
modules::nats::{
|
||||
capability::{Nats, NatsCluster},
|
||||
score_nats_k8s::NatsK8sScore,
|
||||
capability::{
|
||||
JetstreamConfig, JetstreamUserConfig, Nats, NatsCluster, NatsCredentials, NatsJetstream,
|
||||
}, score_nats_enable_jetstream::NatsK8sEnableJetstreamScore, score_nats_jetstream::NatsK8sJetstreamScore, score_nats_jetstream_credentials::NatsK8sJetstreamCredentialsScore, score_nats_k8s::NatsK8sScore
|
||||
},
|
||||
score::Score,
|
||||
topology::K8sAnywhereTopology,
|
||||
@@ -36,3 +37,31 @@ impl Nats for K8sAnywhereTopology {
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl NatsJetstream for K8sAnywhereTopology {
|
||||
async fn enable_jetstream(&self, jetstream_config: &JetstreamConfig) -> Result<String, String> {
|
||||
NatsK8sEnableJetstreamScore {}
|
||||
.interpret(&Inventory::empty(), self)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to enable jetstream: {}", e))?;
|
||||
|
||||
Ok("Nats jetstream enabled".to_string())
|
||||
}
|
||||
|
||||
async fn create_jetstream_credentials(
|
||||
&self,
|
||||
jetstream_user_config: &JetstreamUserConfig,
|
||||
) -> Result<String, String> {
|
||||
NatsK8sJetstreamCredentialsScore {}
|
||||
.interpret(&Inventory::empty(), self)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to create credentials jetstream: {}", e))?;
|
||||
|
||||
Ok("Nats jetstream user credentials configured".to_string())
|
||||
}
|
||||
|
||||
async fn get_jetstream_credentials(&self) -> Result<NatsCredentials, String> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
use std::fmt::Display;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use serde::Serialize;
|
||||
use harmony_secret::Secret;
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[async_trait]
|
||||
pub trait Nats {
|
||||
@@ -13,6 +15,19 @@ pub trait Nats {
|
||||
) -> Result<String, String>;
|
||||
}
|
||||
|
||||
//ADR nats credentials management ->
|
||||
#[async_trait]
|
||||
pub trait NatsJetstream: Nats {
|
||||
async fn enable_jetstream(&self, jetstream_config: &JetstreamConfig) -> Result<String, String>;
|
||||
|
||||
async fn create_jetstream_credentials(
|
||||
&self,
|
||||
jetstream_user_config: &JetstreamUserConfig,
|
||||
) -> Result<String, String>;
|
||||
|
||||
async fn get_jetstream_credentials(&self) -> Result<NatsCredentials, String>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait NatsSupercluster {
|
||||
async fn deploy_site(
|
||||
@@ -51,3 +66,26 @@ impl Display for NatsEndpoint {
|
||||
write!(f, "({})", self.host,)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct JetstreamConfig {
|
||||
store_dir: String,
|
||||
max_memory_store: u8,
|
||||
max_file_store: u8,
|
||||
domain: String,
|
||||
max_buffered_msgs: u8,
|
||||
max_buffered_size: u8,
|
||||
request_qeue_limit: u8,
|
||||
sync_interval: u8,
|
||||
strict: bool,
|
||||
unique_tag: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct JetstreamUserConfig {}
|
||||
|
||||
#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq, Clone)]
|
||||
pub struct NatsCredentials {
|
||||
pub user: String,
|
||||
pub password: String,
|
||||
}
|
||||
|
||||
@@ -3,3 +3,6 @@ pub mod decentralized;
|
||||
pub mod pki;
|
||||
pub mod score_nats_k8s;
|
||||
pub mod score_nats_supercluster;
|
||||
pub mod score_nats_jetstream;
|
||||
pub mod score_nats_jetstream_credentials;
|
||||
pub mod score_nats_enable_jetstream;
|
||||
|
||||
@@ -63,7 +63,7 @@ where
|
||||
ca_issuer_name.into(),
|
||||
None,
|
||||
Some(vec![cluster.dns_name.clone()]),
|
||||
Some(false),
|
||||
Some(true),
|
||||
&root_ca_config,
|
||||
)
|
||||
.await
|
||||
|
||||
55
harmony/src/modules/nats/score_nats_enable_jetstream.rs
Normal file
55
harmony/src/modules/nats/score_nats_enable_jetstream.rs
Normal file
@@ -0,0 +1,55 @@
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::{
|
||||
data::Version,
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
modules::nats::capability::NatsJetstream,
|
||||
score::Score,
|
||||
topology::Topology,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct NatsK8sEnableJetstreamScore {}
|
||||
|
||||
impl<T: Topology > Score<T> for NatsK8sEnableJetstreamScore {
|
||||
fn name(&self) -> String {
|
||||
"NatsK8sEnableJetstreamScore".to_string()
|
||||
}
|
||||
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(NatsK8sEnableJetstreamInterpet {})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct NatsK8sEnableJetstreamInterpet {}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology> Interpret<T> for NatsK8sEnableJetstreamInterpet {
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("NatsK8sEnableJetstreamInterpet")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
69
harmony/src/modules/nats/score_nats_jetstream.rs
Normal file
69
harmony/src/modules/nats/score_nats_jetstream.rs
Normal file
@@ -0,0 +1,69 @@
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use log::info;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::{
|
||||
data::Version,
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
modules::nats::capability::{JetstreamConfig, JetstreamUserConfig, NatsJetstream},
|
||||
score::Score,
|
||||
topology::Topology,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct NatsK8sJetstreamScore {
|
||||
jetstream_config: JetstreamConfig,
|
||||
jetstream_user_config: JetstreamUserConfig,
|
||||
}
|
||||
|
||||
impl<T: Topology + NatsJetstream> Score<T> for NatsK8sJetstreamScore {
|
||||
fn name(&self) -> String {
|
||||
"NatsK8sJetstreamScore".to_string()
|
||||
}
|
||||
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(NatsK8sJetstreamInterpet {
|
||||
score: self.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct NatsK8sJetstreamInterpet {
|
||||
score: NatsK8sJetstreamScore,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + NatsJetstream> Interpret<T> for NatsK8sJetstreamInterpet {
|
||||
async fn execute(
|
||||
&self,
|
||||
_inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!("enabling jetstream for nats cluster");
|
||||
topology.enable_jetstream(&self.score.jetstream_config).await?;
|
||||
info!("creating jetstream user credentials");
|
||||
topology.create_jetstream_credentials(&self.score.jetstream_user_config).await?;
|
||||
Ok(Outcome::success(
|
||||
"Successfully enabled jetstream and created user credentials".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("NatsK8sJetstreamInterpret")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
54
harmony/src/modules/nats/score_nats_jetstream_credentials.rs
Normal file
54
harmony/src/modules/nats/score_nats_jetstream_credentials.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::{
|
||||
data::Version,
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
score::Score,
|
||||
topology::Topology,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct NatsK8sJetstreamCredentialsScore {}
|
||||
|
||||
impl<T: Topology> Score<T> for NatsK8sJetstreamCredentialsScore {
|
||||
fn name(&self) -> String {
|
||||
"NatsK8sJetstreamCredentialsScore".to_string()
|
||||
}
|
||||
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(NatsK8sJetstreamCredentialsInterpret {})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct NatsK8sJetstreamCredentialsInterpret {}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology> Interpret<T> for NatsK8sJetstreamCredentialsInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("NatsK8sJetstreamCredentialsInterpret")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,7 @@ use harmony_secret::{Secret, SecretManager};
|
||||
use harmony_types::id::Id;
|
||||
use k8s_openapi::{ByteString, api::core::v1::Secret as K8sSecret};
|
||||
use kube::api::ObjectMeta;
|
||||
use log::{debug, info};
|
||||
use log::{debug, info, warn};
|
||||
use non_blank_string_rs::NonBlankString;
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -18,7 +18,7 @@ use crate::{
|
||||
modules::{
|
||||
helm::chart::{HelmChartScore, HelmRepository},
|
||||
k8s::{ingress::K8sIngressScore, resource::K8sResourceScore},
|
||||
nats::capability::{Nats, NatsCluster, NatsEndpoint},
|
||||
nats::capability::{Nats, NatsCluster, NatsCredentials, NatsEndpoint},
|
||||
okd::{
|
||||
crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig},
|
||||
route::OKDRouteScore,
|
||||
@@ -36,7 +36,7 @@ pub struct NatsK8sScore {
|
||||
pub ca_bundle: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
impl<T: Topology + HelmCommand + Nats + K8sclient + TlsRouter + 'static> Score<T> for NatsK8sScore {
|
||||
impl<T: Topology + HelmCommand + K8sclient + TlsRouter + 'static> Score<T> for NatsK8sScore {
|
||||
fn name(&self) -> String {
|
||||
"NatsK8sScore".to_string()
|
||||
}
|
||||
@@ -53,7 +53,7 @@ pub struct NatsK8sInterpret {
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + HelmCommand + Nats + K8sclient + TlsRouter + 'static> Interpret<T>
|
||||
impl<T: Topology + HelmCommand + K8sclient + TlsRouter + 'static> Interpret<T>
|
||||
for NatsK8sInterpret
|
||||
{
|
||||
async fn execute(
|
||||
@@ -153,19 +153,25 @@ impl NatsK8sInterpret {
|
||||
.await
|
||||
}
|
||||
KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => {
|
||||
warn!("
|
||||
https://kubernetes.github.io/ingress-nginx/user-guide/tls/#ssl-passthrough
|
||||
for now tls router s not impleneted for a standard kubernetes cluster. an ingress must be manually created with the annotation
|
||||
nginx.ingress.kubernetes.io/ssl-passthrough: \"true\"
|
||||
");
|
||||
Ok(Outcome::noop("noop".to_string()))
|
||||
//TODO untested
|
||||
K8sIngressScore {
|
||||
name: todo!(),
|
||||
host: todo!(),
|
||||
backend_service: todo!(),
|
||||
port: todo!(),
|
||||
path: todo!(),
|
||||
path_type: todo!(),
|
||||
namespace: todo!(),
|
||||
ingress_class_name: todo!(),
|
||||
}
|
||||
.interpret(inventory, topology)
|
||||
.await
|
||||
// K8sIngressScore {
|
||||
// name: todo!(),
|
||||
// host: todo!(),
|
||||
// backend_service: todo!(),
|
||||
// port: todo!(),
|
||||
// path: todo!(),
|
||||
// path_type: todo!(),
|
||||
// namespace: todo!(),
|
||||
// ingress_class_name: todo!(),
|
||||
// }
|
||||
// .interpret(inventory, topology)
|
||||
// .await
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -221,7 +227,7 @@ impl NatsK8sInterpret {
|
||||
namespace: String,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let mut gateway_gateways = String::new();
|
||||
let admin = SecretManager::get_or_prompt::<NatsAdmin>().await.unwrap();
|
||||
let admin = SecretManager::get_or_prompt::<NatsCredentials>().await.unwrap();
|
||||
let user = admin.user.clone();
|
||||
let password = admin.password.clone();
|
||||
|
||||
@@ -304,6 +310,7 @@ service:
|
||||
ports:
|
||||
gateway:
|
||||
enabled: true
|
||||
publishNotReadyAddresses: true
|
||||
tlsCA:
|
||||
enabled: true
|
||||
secretName: {supercluster_ca_secret_name}
|
||||
@@ -343,8 +350,3 @@ natsBox:
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq, Clone)]
|
||||
struct NatsAdmin {
|
||||
user: String,
|
||||
password: String,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user