Compare commits
24 Commits
e2e-tests-
...
example/vl
| Author | SHA1 | Date | |
|---|---|---|---|
| 304490977c | |||
| 8499f4d1b7 | |||
| aa07f4c8ad | |||
| 77bb138497 | |||
| a16879b1b6 | |||
| f57e6f5957 | |||
| 7605d05de3 | |||
| b244127843 | |||
| 67c3265286 | |||
| d10598d01e | |||
| 61ba7257d0 | |||
| b0e9594d92 | |||
| bfb86f63ce | |||
| d920de34cf | |||
| 4276b9137b | |||
| 6ab88ab8d9 | |||
| 53d0704a35 | |||
| d8ab9d52a4 | |||
| 2cb7aeefc0 | |||
| 16016febcf | |||
| e709de531d | |||
| 6ab0f3a6ab | |||
| 724ab0b888 | |||
| 8b6ce8d069 |
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -7001,6 +7001,18 @@ version = "0.9.5"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
|
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "vllm"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"env_logger",
|
||||||
|
"harmony",
|
||||||
|
"harmony_cli",
|
||||||
|
"k8s-openapi",
|
||||||
|
"log",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "wait-timeout"
|
name = "wait-timeout"
|
||||||
version = "0.2.1"
|
version = "0.2.1"
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ members = [
|
|||||||
"adr/agent_discovery/mdns",
|
"adr/agent_discovery/mdns",
|
||||||
"brocade",
|
"brocade",
|
||||||
"harmony_agent",
|
"harmony_agent",
|
||||||
"harmony_agent/deploy", "harmony_node_readiness", "harmony-k8s",
|
"harmony_agent/deploy", "harmony_node_readiness", "harmony-k8s", "examples/vllm",
|
||||||
]
|
]
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ async fn main() {
|
|||||||
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
|
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
|
||||||
// "default" namespace, 1 instance, 1Gi storage
|
// "default" namespace, 1 instance, 1Gi storage
|
||||||
},
|
},
|
||||||
hostname: "postgrestest.sto1.nationtech.io".to_string(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
harmony_cli::run(
|
harmony_cli::run(
|
||||||
|
|||||||
@@ -6,7 +6,10 @@ use harmony::{
|
|||||||
data::{FileContent, FilePath},
|
data::{FileContent, FilePath},
|
||||||
modules::{
|
modules::{
|
||||||
inventory::HarmonyDiscoveryStrategy,
|
inventory::HarmonyDiscoveryStrategy,
|
||||||
okd::{installation::OKDInstallationPipeline, ipxe::OKDIpxeScore},
|
okd::{
|
||||||
|
installation::OKDInstallationPipeline, ipxe::OKDIpxeScore,
|
||||||
|
load_balancer::OKDLoadBalancerScore,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
score::Score,
|
score::Score,
|
||||||
topology::HAClusterTopology,
|
topology::HAClusterTopology,
|
||||||
@@ -32,6 +35,7 @@ async fn main() {
|
|||||||
scores
|
scores
|
||||||
.append(&mut OKDInstallationPipeline::get_all_scores(HarmonyDiscoveryStrategy::MDNS).await);
|
.append(&mut OKDInstallationPipeline::get_all_scores(HarmonyDiscoveryStrategy::MDNS).await);
|
||||||
|
|
||||||
|
scores.push(Box::new(OKDLoadBalancerScore::new(&topology)));
|
||||||
harmony_cli::run(inventory, topology, scores, None)
|
harmony_cli::run(inventory, topology, scores, None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ async fn main() {
|
|||||||
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
|
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
|
||||||
// 1 instance, 1Gi storage
|
// 1 instance, 1Gi storage
|
||||||
},
|
},
|
||||||
hostname: "postgrestest.sto1.nationtech.io".to_string(),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let test_connection = PostgreSQLConnectionScore {
|
let test_connection = PostgreSQLConnectionScore {
|
||||||
|
|||||||
@@ -44,6 +44,7 @@ fn build_large_score() -> LoadBalancerScore {
|
|||||||
],
|
],
|
||||||
listening_port: SocketAddr::V4(SocketAddrV4::new(ipv4!("192.168.0.0"), 49387)),
|
listening_port: SocketAddr::V4(SocketAddrV4::new(ipv4!("192.168.0.0"), 49387)),
|
||||||
health_check: Some(HealthCheck::HTTP(
|
health_check: Some(HealthCheck::HTTP(
|
||||||
|
Some(1993),
|
||||||
"/some_long_ass_path_to_see_how_it_is_displayed_but_it_has_to_be_even_longer"
|
"/some_long_ass_path_to_see_how_it_is_displayed_but_it_has_to_be_even_longer"
|
||||||
.to_string(),
|
.to_string(),
|
||||||
HttpMethod::GET,
|
HttpMethod::GET,
|
||||||
|
|||||||
15
examples/vllm/Cargo.toml
Normal file
15
examples/vllm/Cargo.toml
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
[package]
|
||||||
|
name = "vllm"
|
||||||
|
edition = "2024"
|
||||||
|
version.workspace = true
|
||||||
|
readme.workspace = true
|
||||||
|
license.workspace = true
|
||||||
|
publish = false
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
harmony = { path = "../../harmony" }
|
||||||
|
harmony_cli = { path = "../../harmony_cli" }
|
||||||
|
k8s-openapi = { workspace = true }
|
||||||
|
tokio = { workspace = true }
|
||||||
|
log = { workspace = true }
|
||||||
|
env_logger = { workspace = true }
|
||||||
523
examples/vllm/src/main.rs
Normal file
523
examples/vllm/src/main.rs
Normal file
@@ -0,0 +1,523 @@
|
|||||||
|
//! vLLM Deployment Example for Qwen3.5-27B-FP8 on NVIDIA RTX 5090
|
||||||
|
//!
|
||||||
|
//! This example deploys vLLM serving Qwen3.5-27B with FP8 quantization,
|
||||||
|
//! optimized for single RTX 5090 (32GB VRAM) with tool calling support.
|
||||||
|
//!
|
||||||
|
//! # Architecture & Memory Constraints
|
||||||
|
//!
|
||||||
|
//! **Model Details:**
|
||||||
|
//! - Parameters: 27B (dense, not sparse/MoE)
|
||||||
|
//! - Quantization: FP8 (8-bit weights)
|
||||||
|
//! - Model size: ~27-28GB in memory
|
||||||
|
//! - Native context: 262,144 tokens (will NOT fit in 32GB VRAM)
|
||||||
|
//!
|
||||||
|
//! **VRAM Budget for RTX 5090 (32GB):**
|
||||||
|
//! - Model weights (FP8): ~27GB
|
||||||
|
//! - Framework overhead: ~1-2GB
|
||||||
|
//! - KV cache: ~2-3GB (for 16k context)
|
||||||
|
//! - CUDA context: ~500MB
|
||||||
|
//! - Temporary buffers: ~500MB
|
||||||
|
//! - **Total: ~31-33GB** (tight fit, leaves minimal headroom)
|
||||||
|
//!
|
||||||
|
//! # OpenShift/OKD Requirements
|
||||||
|
//!
|
||||||
|
//! **SCC (Security Context Constraint) Setup:**
|
||||||
|
//!
|
||||||
|
//! The official vLLM container runs as root and writes to `/root/.cache/huggingface`.
|
||||||
|
//! On OpenShift/OKD with the default restricted SCC, containers run as arbitrary UIDs
|
||||||
|
//! and cannot write to `/root`. For testing, grant the `anyuid` SCC:
|
||||||
|
//!
|
||||||
|
//! ```bash
|
||||||
|
//! # As cluster admin, grant anyuid SCC to the namespace's service account:
|
||||||
|
//! oc adm policy add-scc-to-user anyuid -z default -n vllm-qwen
|
||||||
|
//! ```
|
||||||
|
//!
|
||||||
|
//! This allows pods in the `vllm-qwen` namespace to run as root (UID 0).
|
||||||
|
//! For production, consider building a custom vLLM image that runs as non-root.
|
||||||
|
//!
|
||||||
|
//! # Critical Configuration Notes
|
||||||
|
//!
|
||||||
|
//! 1. **GPU_MEMORY_UTILIZATION=1.0**: Maximum GPU memory allocation.
|
||||||
|
//! NEVER decrease this for dense models - CPU offloading destroys performance
|
||||||
|
//! (100-1000x slower) for models where every parameter is used during inference.
|
||||||
|
//!
|
||||||
|
//! 2. **MAX_MODEL_LEN=16384**: Conservative context length that fits in available VRAM.
|
||||||
|
//! Agentic workflows with long tool call histories will need careful context management.
|
||||||
|
//!
|
||||||
|
//! 3. **--language-model-only**: Skips loading the vision encoder, saving ~1-2GB VRAM.
|
||||||
|
//! Essential for fitting the model in 32GB VRAM.
|
||||||
|
//!
|
||||||
|
//! 4. **PVC Size**: 50Gi for HuggingFace cache. Qwen3.5-27B-FP8 is ~30GB.
|
||||||
|
//!
|
||||||
|
//! # Performance Expectations
|
||||||
|
//!
|
||||||
|
//! - Single token latency: ~50-100ms (no CPU offloading)
|
||||||
|
//! - With CPU offloading: ~5-50 seconds per token (unusable for real-time inference)
|
||||||
|
//! - Throughput: ~10-20 tokens/second (single stream, no batching)
|
||||||
|
//!
|
||||||
|
//! # Next Steps for Production
|
||||||
|
//!
|
||||||
|
//! To increase context length:
|
||||||
|
//! 1. Monitor GPU memory: `kubectl exec -it deployment/qwen3-5-27b -- nvidia-smi dmon -s u`
|
||||||
|
//! 2. If stable, increase MAX_MODEL_LEN (try 32768, then 65536)
|
||||||
|
//! 3. If OOM: revert to lower value
|
||||||
|
//!
|
||||||
|
//! For full 262k context, consider:
|
||||||
|
//! - Multi-GPU setup with tensor parallelism (--tensor-parallel-size 8)
|
||||||
|
//! - Or use a smaller model (Qwen3.5-7B-FP8)
|
||||||
|
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
|
use harmony::{
|
||||||
|
inventory::Inventory,
|
||||||
|
modules::{
|
||||||
|
k8s::resource::K8sResourceScore,
|
||||||
|
okd::{
|
||||||
|
crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig},
|
||||||
|
route::OKDRouteScore,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
score::Score,
|
||||||
|
topology::{K8sAnywhereTopology, TlsRouter},
|
||||||
|
};
|
||||||
|
use k8s_openapi::{
|
||||||
|
api::{
|
||||||
|
apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy},
|
||||||
|
core::v1::{
|
||||||
|
Container, ContainerPort, EmptyDirVolumeSource, EnvVar, EnvVarSource,
|
||||||
|
HTTPGetAction, PersistentVolumeClaim, PersistentVolumeClaimSpec,
|
||||||
|
PersistentVolumeClaimVolumeSource, PodSpec, PodTemplateSpec, Probe,
|
||||||
|
ResourceRequirements, Secret, SecretKeySelector, SecretVolumeSource, Service,
|
||||||
|
ServicePort, ServiceSpec, Volume, VolumeMount, VolumeResourceRequirements,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
apimachinery::pkg::{
|
||||||
|
api::resource::Quantity,
|
||||||
|
apis::meta::v1::{LabelSelector, ObjectMeta},
|
||||||
|
util::intstr::IntOrString,
|
||||||
|
},
|
||||||
|
ByteString,
|
||||||
|
};
|
||||||
|
use log::info;
|
||||||
|
|
||||||
|
const NAMESPACE: &str = "vllm-qwen";
|
||||||
|
const MODEL_NAME: &str = "Qwen/Qwen3.5-27B-FP8";
|
||||||
|
const DEPLOYMENT_NAME: &str = "qwen3-5-27b";
|
||||||
|
const SERVICE_NAME: &str = DEPLOYMENT_NAME;
|
||||||
|
const ROUTE_NAME: &str = DEPLOYMENT_NAME;
|
||||||
|
const PVC_NAME: &str = "huggingface-cache";
|
||||||
|
const SECRET_NAME: &str = "hf-token-secret";
|
||||||
|
|
||||||
|
const VLLM_IMAGE: &str = "vllm/vllm-openai:latest";
|
||||||
|
const SERVICE_PORT: u16 = 8000;
|
||||||
|
const TARGET_PORT: u16 = 8000;
|
||||||
|
|
||||||
|
/// Maximum context length for the model (in tokens).
|
||||||
|
///
|
||||||
|
/// **Impact on VRAM:**
|
||||||
|
/// - Qwen3.5-27B uses per-token KV cache storage for the context window
|
||||||
|
/// - Larger context = more KV cache memory required
|
||||||
|
/// - Approximate KV cache per token: ~32KB for FP8 (very rough estimate)
|
||||||
|
/// - 16k tokens ≈ 0.5-1GB KV cache
|
||||||
|
/// - 262k tokens ≈ 8-16GB KV cache (native context length - will NOT fit in 32GB VRAM)
|
||||||
|
///
|
||||||
|
/// **Performance Impact:**
|
||||||
|
/// - Context length directly impacts memory for storing conversation history
|
||||||
|
/// - Agentic workflows with long tool call histories benefit from more context
|
||||||
|
/// - If context > available VRAM, vLLM will OOM and fail to start
|
||||||
|
///
|
||||||
|
/// **Recommendations for RTX 5090 (32GB):**
|
||||||
|
/// - Start with 16384 (conservative, should work)
|
||||||
|
/// - If no OOM, try 32768 (better for agentic workflows)
|
||||||
|
/// - Monitor GPU memory with `nvidia-smi` during operation
|
||||||
|
const MAX_MODEL_LEN: i64 = 16384;
|
||||||
|
|
||||||
|
/// Fraction of GPU memory to allocate for the model (0.0 to 1.0).
|
||||||
|
///
|
||||||
|
/// **CRITICAL WARNING: This is a dense model!**
|
||||||
|
/// Qwen3.5-27B-FP8 is NOT a sparse/mixture-of-experts model. All 27B parameters
|
||||||
|
/// are active during inference. CPU offloading will DESTROY performance.
|
||||||
|
///
|
||||||
|
/// **What this parameter controls:**
|
||||||
|
/// - Controls how much of GPU memory vLLM pre-allocates for:
|
||||||
|
/// 1. Model weights (~27GB for FP8 quantization)
|
||||||
|
/// 2. KV cache for context window
|
||||||
|
/// 3. Activation buffers for inference
|
||||||
|
/// 4. Runtime overhead
|
||||||
|
///
|
||||||
|
/// **VRAM Allocation Example:**
|
||||||
|
/// - GPU: 32GB RTX 5090
|
||||||
|
/// - GPU_MEMORY_UTILIZATION: 0.95
|
||||||
|
/// - vLLM will try to use: 32GB * 0.95 = 30.4GB
|
||||||
|
/// - Model weights: ~27-28GB
|
||||||
|
/// - Remaining for KV cache + runtime: ~2-3GB
|
||||||
|
///
|
||||||
|
/// **If set too LOW (e.g., 0.7):**
|
||||||
|
/// - vLLM restricts itself to 32GB * 0.7 = 22.4GB
|
||||||
|
/// - Model weights alone need ~27GB
|
||||||
|
/// - vLLM will OFFLOAD model weights to CPU memory
|
||||||
|
/// - Performance: **100-1000x slower** (single token generation can take seconds instead of milliseconds)
|
||||||
|
/// - This is catastrophic for a dense model where every layer needs all parameters
|
||||||
|
///
|
||||||
|
/// **If set too HIGH (e.g., 0.99):**
|
||||||
|
/// - vLLM tries to allocate nearly all GPU memory
|
||||||
|
/// - Risk: CUDA OOM if any other process needs GPU memory
|
||||||
|
/// - Risk: KV cache allocation fails during inference
|
||||||
|
/// - System instability
|
||||||
|
///
|
||||||
|
/// **Current Setting: 0.95**
|
||||||
|
/// - Leaves 5% buffer (1.6GB) for CUDA overhead, system processes
|
||||||
|
/// - Maximum allocation for model + KV cache: ~30.4GB
|
||||||
|
/// - Should leave enough headroom for:
|
||||||
|
/// - CUDA context: ~500MB
|
||||||
|
/// - Temporary buffers: ~500MB
|
||||||
|
/// - Safety margin: ~600MB
|
||||||
|
///
|
||||||
|
/// **How to tune:**
|
||||||
|
/// 1. Start with 0.95 (current setting)
|
||||||
|
/// 2. Monitor with `nvidia-smi dmon -s u` during operation
|
||||||
|
/// 3. If OOM during inference: reduce MAX_MODEL_LEN first
|
||||||
|
/// 4. If stable: try increasing MAX_MODEL_LEN before increasing this
|
||||||
|
/// 5. Only increase this if you're certain no other GPU processes run
|
||||||
|
///
|
||||||
|
/// **NEVER decrease this for dense models!**
|
||||||
|
/// If model doesn't fit, use a smaller model or quantization, not CPU offloading.
|
||||||
|
const GPU_MEMORY_UTILIZATION : f32 = 1.0;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
env_logger::init();
|
||||||
|
|
||||||
|
info!("Deploying vLLM with Qwen3.5-27B-FP8 model");
|
||||||
|
info!("Configuration:");
|
||||||
|
info!(" Model: {}", MODEL_NAME);
|
||||||
|
info!(" Max context length: {} tokens", MAX_MODEL_LEN);
|
||||||
|
info!(" GPU memory utilization: {}", GPU_MEMORY_UTILIZATION);
|
||||||
|
info!(" Language model only: true");
|
||||||
|
info!(" Tool calling enabled: true");
|
||||||
|
|
||||||
|
let topology = K8sAnywhereTopology::from_env();
|
||||||
|
let domain = topology
|
||||||
|
.get_internal_domain()
|
||||||
|
.await
|
||||||
|
.ok()
|
||||||
|
.flatten()
|
||||||
|
.unwrap_or_else(|| "cluster.local".to_string());
|
||||||
|
|
||||||
|
let host = format!("{}-{}.apps.{}", SERVICE_NAME, NAMESPACE, domain);
|
||||||
|
info!("Creating route with host: {}", host);
|
||||||
|
|
||||||
|
let scores: Vec<Box<dyn Score<K8sAnywhereTopology>>> = vec![
|
||||||
|
create_namespace(),
|
||||||
|
create_pvc(),
|
||||||
|
create_secret(),
|
||||||
|
create_deployment(),
|
||||||
|
create_service(),
|
||||||
|
create_route(&host),
|
||||||
|
];
|
||||||
|
|
||||||
|
harmony_cli::run(Inventory::autoload(), topology, scores, None)
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Failed to deploy: {}", e))?;
|
||||||
|
|
||||||
|
info!("Successfully deployed vLLM with Qwen3.5-27B-FP8");
|
||||||
|
info!("Access the API at: http://{}.apps.<cluster-domain>", SERVICE_NAME);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_namespace() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||||
|
use k8s_openapi::api::core::v1::Namespace;
|
||||||
|
|
||||||
|
let namespace = Namespace {
|
||||||
|
metadata: ObjectMeta {
|
||||||
|
name: Some(NAMESPACE.to_string()),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
spec: None,
|
||||||
|
status: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Box::new(K8sResourceScore::single(namespace, None))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_pvc() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||||
|
let pvc = PersistentVolumeClaim {
|
||||||
|
metadata: ObjectMeta {
|
||||||
|
name: Some(PVC_NAME.to_string()),
|
||||||
|
namespace: Some(NAMESPACE.to_string()),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
spec: Some(PersistentVolumeClaimSpec {
|
||||||
|
access_modes: Some(vec!["ReadWriteOnce".to_string()]),
|
||||||
|
resources: Some(VolumeResourceRequirements {
|
||||||
|
requests: Some(BTreeMap::from([(
|
||||||
|
"storage".to_string(),
|
||||||
|
Quantity("50Gi".to_string()),
|
||||||
|
)])),
|
||||||
|
limits: None,
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
status: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Box::new(K8sResourceScore::single(
|
||||||
|
pvc,
|
||||||
|
Some(NAMESPACE.to_string()),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_secret() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||||
|
let mut data = BTreeMap::new();
|
||||||
|
data.insert(
|
||||||
|
"token".to_string(),
|
||||||
|
ByteString("".to_string().into_bytes()),
|
||||||
|
);
|
||||||
|
|
||||||
|
let secret = Secret {
|
||||||
|
metadata: ObjectMeta {
|
||||||
|
name: Some(SECRET_NAME.to_string()),
|
||||||
|
namespace: Some(NAMESPACE.to_string()),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
data: Some(data),
|
||||||
|
immutable: Some(false),
|
||||||
|
type_: Some("Opaque".to_string()),
|
||||||
|
string_data: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Box::new(K8sResourceScore::single(
|
||||||
|
secret,
|
||||||
|
Some(NAMESPACE.to_string()),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_deployment() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||||
|
let deployment = Deployment {
|
||||||
|
metadata: ObjectMeta {
|
||||||
|
name: Some(DEPLOYMENT_NAME.to_string()),
|
||||||
|
namespace: Some(NAMESPACE.to_string()),
|
||||||
|
labels: Some(BTreeMap::from([(
|
||||||
|
"app".to_string(),
|
||||||
|
DEPLOYMENT_NAME.to_string(),
|
||||||
|
)])),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
spec: Some(DeploymentSpec {
|
||||||
|
replicas: Some(1),
|
||||||
|
selector: LabelSelector {
|
||||||
|
match_labels: Some(BTreeMap::from([(
|
||||||
|
"app".to_string(),
|
||||||
|
DEPLOYMENT_NAME.to_string(),
|
||||||
|
)])),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
strategy: Some(DeploymentStrategy {
|
||||||
|
type_: Some("Recreate".to_string()),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
template: PodTemplateSpec {
|
||||||
|
metadata: Some(ObjectMeta {
|
||||||
|
labels: Some(BTreeMap::from([(
|
||||||
|
"app".to_string(),
|
||||||
|
DEPLOYMENT_NAME.to_string(),
|
||||||
|
)])),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
spec: Some(PodSpec {
|
||||||
|
node_selector: Some(BTreeMap::from([(
|
||||||
|
"nvidia.com/gpu.product".to_string(),
|
||||||
|
"NVIDIA-GeForce-RTX-5090".to_string(),
|
||||||
|
)])),
|
||||||
|
volumes: Some(vec![
|
||||||
|
Volume {
|
||||||
|
name: "cache-volume".to_string(),
|
||||||
|
persistent_volume_claim: Some(PersistentVolumeClaimVolumeSource {
|
||||||
|
claim_name: PVC_NAME.to_string(),
|
||||||
|
read_only: Some(false),
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
Volume {
|
||||||
|
name: "shm".to_string(),
|
||||||
|
empty_dir: Some(EmptyDirVolumeSource {
|
||||||
|
medium: Some("Memory".to_string()),
|
||||||
|
size_limit: Some(Quantity("4Gi".to_string())),
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
Volume {
|
||||||
|
name: "hf-token".to_string(),
|
||||||
|
secret: Some(SecretVolumeSource {
|
||||||
|
secret_name: Some(SECRET_NAME.to_string()),
|
||||||
|
optional: Some(true),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
containers: vec![Container {
|
||||||
|
name: DEPLOYMENT_NAME.to_string(),
|
||||||
|
image: Some(VLLM_IMAGE.to_string()),
|
||||||
|
command: Some(vec!["/bin/sh".to_string(), "-c".to_string()]),
|
||||||
|
args: Some(vec![build_vllm_command()]),
|
||||||
|
env: Some(vec![
|
||||||
|
EnvVar {
|
||||||
|
name: "HF_TOKEN".to_string(),
|
||||||
|
value_from: Some(EnvVarSource {
|
||||||
|
secret_key_ref: Some(SecretKeySelector {
|
||||||
|
key: "token".to_string(),
|
||||||
|
name: SECRET_NAME.to_string(),
|
||||||
|
optional: Some(true),
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
value: None,
|
||||||
|
},
|
||||||
|
EnvVar {
|
||||||
|
name: "VLLM_WORKER_MULTIPROC_METHOD".to_string(),
|
||||||
|
value: Some("spawn".to_string()),
|
||||||
|
value_from: None,
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
ports: Some(vec![ContainerPort {
|
||||||
|
container_port: SERVICE_PORT as i32,
|
||||||
|
protocol: Some("TCP".to_string()),
|
||||||
|
..Default::default()
|
||||||
|
}]),
|
||||||
|
resources: Some(ResourceRequirements {
|
||||||
|
limits: Some(BTreeMap::from([
|
||||||
|
("cpu".to_string(), Quantity("10".to_string())),
|
||||||
|
("memory".to_string(), Quantity("30Gi".to_string())),
|
||||||
|
("nvidia.com/gpu".to_string(), Quantity("1".to_string())),
|
||||||
|
])),
|
||||||
|
requests: Some(BTreeMap::from([
|
||||||
|
("cpu".to_string(), Quantity("2".to_string())),
|
||||||
|
("memory".to_string(), Quantity("10Gi".to_string())),
|
||||||
|
("nvidia.com/gpu".to_string(), Quantity("1".to_string())),
|
||||||
|
])),
|
||||||
|
claims: None,
|
||||||
|
}),
|
||||||
|
volume_mounts: Some(vec![
|
||||||
|
VolumeMount {
|
||||||
|
name: "cache-volume".to_string(),
|
||||||
|
mount_path: "/root/.cache/huggingface".to_string(),
|
||||||
|
read_only: Some(false),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
VolumeMount {
|
||||||
|
name: "shm".to_string(),
|
||||||
|
mount_path: "/dev/shm".to_string(),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
VolumeMount {
|
||||||
|
name: "hf-token".to_string(),
|
||||||
|
mount_path: "/etc/secrets/hf-token".to_string(),
|
||||||
|
read_only: Some(true),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
]),
|
||||||
|
liveness_probe: Some(Probe {
|
||||||
|
http_get: Some(HTTPGetAction {
|
||||||
|
path: Some("/health".to_string()),
|
||||||
|
port: IntOrString::Int(SERVICE_PORT as i32),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
initial_delay_seconds: Some(300),
|
||||||
|
period_seconds: Some(30),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
readiness_probe: Some(Probe {
|
||||||
|
http_get: Some(HTTPGetAction {
|
||||||
|
path: Some("/health".to_string()),
|
||||||
|
port: IntOrString::Int(SERVICE_PORT as i32),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
initial_delay_seconds: Some(120),
|
||||||
|
period_seconds: Some(10),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
}],
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
status: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Box::new(K8sResourceScore::single(
|
||||||
|
deployment,
|
||||||
|
Some(NAMESPACE.to_string()),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_vllm_command() -> String {
|
||||||
|
format!(
|
||||||
|
"vllm serve {} \
|
||||||
|
--port {} \
|
||||||
|
--max-model-len {} \
|
||||||
|
--gpu-memory-utilization {} \
|
||||||
|
--reasoning-parser qwen3 \
|
||||||
|
--enable-auto-tool-choice \
|
||||||
|
--tool-call-parser qwen3_coder \
|
||||||
|
--language-model-only",
|
||||||
|
MODEL_NAME, SERVICE_PORT, MAX_MODEL_LEN, GPU_MEMORY_UTILIZATION
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_service() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||||
|
let service = Service {
|
||||||
|
metadata: ObjectMeta {
|
||||||
|
name: Some(SERVICE_NAME.to_string()),
|
||||||
|
namespace: Some(NAMESPACE.to_string()),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
spec: Some(ServiceSpec {
|
||||||
|
ports: Some(vec![ServicePort {
|
||||||
|
name: Some("http".to_string()),
|
||||||
|
port: SERVICE_PORT as i32,
|
||||||
|
protocol: Some("TCP".to_string()),
|
||||||
|
target_port: Some(IntOrString::Int(TARGET_PORT as i32)),
|
||||||
|
..Default::default()
|
||||||
|
}]),
|
||||||
|
selector: Some(BTreeMap::from([(
|
||||||
|
"app".to_string(),
|
||||||
|
DEPLOYMENT_NAME.to_string(),
|
||||||
|
)])),
|
||||||
|
type_: Some("ClusterIP".to_string()),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
status: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
Box::new(K8sResourceScore::single(
|
||||||
|
service,
|
||||||
|
Some(NAMESPACE.to_string()),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_route(host: &str) -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||||
|
let route_spec = RouteSpec {
|
||||||
|
to: RouteTargetReference {
|
||||||
|
kind: "Service".to_string(),
|
||||||
|
name: SERVICE_NAME.to_string(),
|
||||||
|
weight: Some(100),
|
||||||
|
},
|
||||||
|
host: Some(host.to_string()),
|
||||||
|
port: Some(RoutePort {
|
||||||
|
target_port: SERVICE_PORT as u16,
|
||||||
|
}),
|
||||||
|
tls: Some(TLSConfig {
|
||||||
|
termination: "edge".to_string(),
|
||||||
|
insecure_edge_termination_policy: Some("Redirect".to_string()),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
wildcard_policy: None,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
Box::new(OKDRouteScore::new(ROUTE_NAME, NAMESPACE, route_spec))
|
||||||
|
}
|
||||||
@@ -109,6 +109,13 @@ impl K8sclient for K8sAnywhereTopology {
|
|||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl TlsRouter for K8sAnywhereTopology {
|
impl TlsRouter for K8sAnywhereTopology {
|
||||||
|
async fn get_public_domain(&self) -> Result<String, String> {
|
||||||
|
match &self.config.public_domain {
|
||||||
|
Some(public_domain) => Ok(public_domain.to_string()),
|
||||||
|
None => Err("Public domain not available".to_string()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_internal_domain(&self) -> Result<Option<String>, String> {
|
async fn get_internal_domain(&self) -> Result<Option<String>, String> {
|
||||||
match self.get_k8s_distribution().await.map_err(|e| {
|
match self.get_k8s_distribution().await.map_err(|e| {
|
||||||
format!(
|
format!(
|
||||||
@@ -1124,6 +1131,7 @@ pub struct K8sAnywhereConfig {
|
|||||||
///
|
///
|
||||||
/// If the context name is not found, it will fail to initialize.
|
/// If the context name is not found, it will fail to initialize.
|
||||||
pub k8s_context: Option<String>,
|
pub k8s_context: Option<String>,
|
||||||
|
public_domain: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl K8sAnywhereConfig {
|
impl K8sAnywhereConfig {
|
||||||
@@ -1151,6 +1159,7 @@ impl K8sAnywhereConfig {
|
|||||||
|
|
||||||
let mut kubeconfig: Option<String> = None;
|
let mut kubeconfig: Option<String> = None;
|
||||||
let mut k8s_context: Option<String> = None;
|
let mut k8s_context: Option<String> = None;
|
||||||
|
let mut public_domain: Option<String> = None;
|
||||||
|
|
||||||
for part in env_var_value.split(',') {
|
for part in env_var_value.split(',') {
|
||||||
let kv: Vec<&str> = part.splitn(2, '=').collect();
|
let kv: Vec<&str> = part.splitn(2, '=').collect();
|
||||||
@@ -1158,6 +1167,7 @@ impl K8sAnywhereConfig {
|
|||||||
match kv[0].trim() {
|
match kv[0].trim() {
|
||||||
"kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()),
|
"kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()),
|
||||||
"context" => k8s_context = Some(kv[1].trim().to_string()),
|
"context" => k8s_context = Some(kv[1].trim().to_string()),
|
||||||
|
"public_domain" => public_domain = Some(kv[1].trim().to_string()),
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1175,6 +1185,7 @@ impl K8sAnywhereConfig {
|
|||||||
K8sAnywhereConfig {
|
K8sAnywhereConfig {
|
||||||
kubeconfig,
|
kubeconfig,
|
||||||
k8s_context,
|
k8s_context,
|
||||||
|
public_domain,
|
||||||
use_system_kubeconfig,
|
use_system_kubeconfig,
|
||||||
autoinstall: false,
|
autoinstall: false,
|
||||||
use_local_k3d: false,
|
use_local_k3d: false,
|
||||||
@@ -1217,6 +1228,7 @@ impl K8sAnywhereConfig {
|
|||||||
use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D")
|
use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D")
|
||||||
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)),
|
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)),
|
||||||
k8s_context: std::env::var("HARMONY_K8S_CONTEXT").ok(),
|
k8s_context: std::env::var("HARMONY_K8S_CONTEXT").ok(),
|
||||||
|
public_domain: std::env::var("HARMONY_PUBLIC_DOMAIN").ok(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -106,6 +106,7 @@ pub enum SSL {
|
|||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Serialize)]
|
#[derive(Debug, Clone, PartialEq, Serialize)]
|
||||||
pub enum HealthCheck {
|
pub enum HealthCheck {
|
||||||
HTTP(String, HttpMethod, HttpStatusCode, SSL),
|
/// HTTP(None, "/healthz/ready", HttpMethod::GET, HttpStatusCode::Success2xx, SSL::Disabled)
|
||||||
|
HTTP(Option<u16>, String, HttpMethod, HttpStatusCode, SSL),
|
||||||
TCP(Option<u16>),
|
TCP(Option<u16>),
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -122,4 +122,6 @@ pub trait TlsRouter: Send + Sync {
|
|||||||
|
|
||||||
/// Returns the port that this router exposes externally.
|
/// Returns the port that this router exposes externally.
|
||||||
async fn get_router_port(&self) -> u16;
|
async fn get_router_port(&self) -> u16;
|
||||||
|
|
||||||
|
async fn get_public_domain(&self) -> Result<String, String>;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -216,7 +216,15 @@ pub(crate) fn get_health_check_for_backend(
|
|||||||
SSL::Other(other.to_string())
|
SSL::Other(other.to_string())
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Some(HealthCheck::HTTP(path, method, status_code, ssl))
|
|
||||||
|
let port = haproxy_health_check
|
||||||
|
.checkport
|
||||||
|
.content_string()
|
||||||
|
.parse::<u16>()
|
||||||
|
.ok();
|
||||||
|
debug!("Found haproxy healthcheck port {port:?}");
|
||||||
|
|
||||||
|
Some(HealthCheck::HTTP(port, path, method, status_code, ssl))
|
||||||
}
|
}
|
||||||
_ => panic!("Received unsupported health check type {}", uppercase),
|
_ => panic!("Received unsupported health check type {}", uppercase),
|
||||||
}
|
}
|
||||||
@@ -251,7 +259,7 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
|
|||||||
// frontend points to backend
|
// frontend points to backend
|
||||||
let healthcheck = if let Some(health_check) = &service.health_check {
|
let healthcheck = if let Some(health_check) = &service.health_check {
|
||||||
match health_check {
|
match health_check {
|
||||||
HealthCheck::HTTP(path, http_method, _http_status_code, ssl) => {
|
HealthCheck::HTTP(port, path, http_method, _http_status_code, ssl) => {
|
||||||
let ssl: MaybeString = match ssl {
|
let ssl: MaybeString = match ssl {
|
||||||
SSL::SSL => "ssl".into(),
|
SSL::SSL => "ssl".into(),
|
||||||
SSL::SNI => "sslni".into(),
|
SSL::SNI => "sslni".into(),
|
||||||
@@ -259,14 +267,21 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
|
|||||||
SSL::Default => "".into(),
|
SSL::Default => "".into(),
|
||||||
SSL::Other(other) => other.as_str().into(),
|
SSL::Other(other) => other.as_str().into(),
|
||||||
};
|
};
|
||||||
|
let path_without_query = path.split_once('?').map_or(path.as_str(), |(p, _)| p);
|
||||||
|
let (port, port_name) = match port {
|
||||||
|
Some(port) => (Some(port.to_string()), port.to_string()),
|
||||||
|
None => (None, "serverport".to_string()),
|
||||||
|
};
|
||||||
|
|
||||||
let haproxy_check = HAProxyHealthCheck {
|
let haproxy_check = HAProxyHealthCheck {
|
||||||
name: format!("HTTP_{http_method}_{path}"),
|
name: format!("HTTP_{http_method}_{path_without_query}_{port_name}"),
|
||||||
uuid: Uuid::new_v4().to_string(),
|
uuid: Uuid::new_v4().to_string(),
|
||||||
http_method: http_method.to_string().into(),
|
http_method: http_method.to_string().to_lowercase().into(),
|
||||||
health_check_type: "http".to_string(),
|
health_check_type: "http".to_string(),
|
||||||
http_uri: path.clone().into(),
|
http_uri: path.clone().into(),
|
||||||
interval: "2s".to_string(),
|
interval: "2s".to_string(),
|
||||||
ssl,
|
ssl,
|
||||||
|
checkport: MaybeString::from(port.map(|p| p.to_string())),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -305,7 +320,10 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
|
|||||||
let mut backend = HAProxyBackend {
|
let mut backend = HAProxyBackend {
|
||||||
uuid: Uuid::new_v4().to_string(),
|
uuid: Uuid::new_v4().to_string(),
|
||||||
enabled: 1,
|
enabled: 1,
|
||||||
name: format!("backend_{}", service.listening_port),
|
name: format!(
|
||||||
|
"backend_{}",
|
||||||
|
service.listening_port.to_string().replace(':', "_")
|
||||||
|
),
|
||||||
algorithm: "roundrobin".to_string(),
|
algorithm: "roundrobin".to_string(),
|
||||||
random_draws: Some(2),
|
random_draws: Some(2),
|
||||||
stickiness_expire: "30m".to_string(),
|
stickiness_expire: "30m".to_string(),
|
||||||
@@ -337,10 +355,22 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
|
|||||||
let frontend = Frontend {
|
let frontend = Frontend {
|
||||||
uuid: uuid::Uuid::new_v4().to_string(),
|
uuid: uuid::Uuid::new_v4().to_string(),
|
||||||
enabled: 1,
|
enabled: 1,
|
||||||
name: format!("frontend_{}", service.listening_port),
|
name: format!(
|
||||||
|
"frontend_{}",
|
||||||
|
service.listening_port.to_string().replace(':', "_")
|
||||||
|
),
|
||||||
bind: service.listening_port.to_string(),
|
bind: service.listening_port.to_string(),
|
||||||
mode: "tcp".to_string(), // TODO do not depend on health check here
|
mode: "tcp".to_string(), // TODO do not depend on health check here
|
||||||
default_backend: Some(backend.uuid.clone()),
|
default_backend: Some(backend.uuid.clone()),
|
||||||
|
stickiness_expire: "30m".to_string().into(),
|
||||||
|
stickiness_size: "50k".to_string().into(),
|
||||||
|
stickiness_conn_rate_period: "10s".to_string().into(),
|
||||||
|
stickiness_sess_rate_period: "10s".to_string().into(),
|
||||||
|
stickiness_http_req_rate_period: "10s".to_string().into(),
|
||||||
|
stickiness_http_err_rate_period: "10s".to_string().into(),
|
||||||
|
stickiness_bytes_in_rate_period: "1m".to_string().into(),
|
||||||
|
stickiness_bytes_out_rate_period: "1m".to_string().into(),
|
||||||
|
ssl_hsts_max_age: 15768000,
|
||||||
..Default::default()
|
..Default::default()
|
||||||
};
|
};
|
||||||
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");
|
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
use std::fs::{self};
|
use std::fs::{self};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::PathBuf;
|
||||||
use std::process;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
@@ -65,6 +64,7 @@ pub struct RustWebapp {
|
|||||||
///
|
///
|
||||||
/// This is the place to put the public host name if this is a public facing webapp.
|
/// This is the place to put the public host name if this is a public facing webapp.
|
||||||
pub dns: String,
|
pub dns: String,
|
||||||
|
pub version: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Application for RustWebapp {
|
impl Application for RustWebapp {
|
||||||
@@ -465,6 +465,7 @@ impl RustWebapp {
|
|||||||
|
|
||||||
let app_name = &self.name;
|
let app_name = &self.name;
|
||||||
let service_port = self.service_port;
|
let service_port = self.service_port;
|
||||||
|
let version = &self.version;
|
||||||
// Create Chart.yaml
|
// Create Chart.yaml
|
||||||
let chart_yaml = format!(
|
let chart_yaml = format!(
|
||||||
r#"
|
r#"
|
||||||
@@ -472,7 +473,7 @@ apiVersion: v2
|
|||||||
name: {chart_name}
|
name: {chart_name}
|
||||||
description: A Helm chart for the {app_name} web application.
|
description: A Helm chart for the {app_name} web application.
|
||||||
type: application
|
type: application
|
||||||
version: 0.2.1
|
version: {version}
|
||||||
appVersion: "{image_tag}"
|
appVersion: "{image_tag}"
|
||||||
"#,
|
"#,
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -1,3 +1,7 @@
|
|||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use tokio_retry::{Retry, strategy::ExponentialBackoff};
|
||||||
|
|
||||||
use crate::modules::{
|
use crate::modules::{
|
||||||
cert_manager::{
|
cert_manager::{
|
||||||
capability::{CertificateManagement, CertificateManagementConfig},
|
capability::{CertificateManagement, CertificateManagementConfig},
|
||||||
@@ -69,9 +73,28 @@ where
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| e.to_string())?;
|
.map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
self.topology
|
let strategy = ExponentialBackoff::from_millis(250)
|
||||||
|
.factor(2)
|
||||||
|
.max_delay(Duration::from_millis(1000))
|
||||||
|
.take(10);
|
||||||
|
|
||||||
|
Retry::spawn(strategy, || async {
|
||||||
|
log::debug!("Attempting CA cert fetch");
|
||||||
|
|
||||||
|
let res = self
|
||||||
|
.topology
|
||||||
.get_ca_certificate(root_ca_cert_name.into(), &root_ca_config)
|
.get_ca_certificate(root_ca_cert_name.into(), &root_ca_config)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(cert) => Ok(cert),
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("Retryable error: {:?}", e);
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
.await
|
.await
|
||||||
.map_err(|e| e.to_string())
|
.map_err(|e| format!("Retries exhausted: {:?}", e))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,20 @@ use log::warn;
|
|||||||
use crate::topology::{FailoverTopology, TlsRoute, TlsRouter};
|
use crate::topology::{FailoverTopology, TlsRoute, TlsRouter};
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<T: TlsRouter> TlsRouter for FailoverTopology<T> {
|
impl<T: TlsRouter + Send> TlsRouter for FailoverTopology<T> {
|
||||||
|
async fn get_public_domain(&self) -> Result<String, String> {
|
||||||
|
/*
|
||||||
|
let primary_domain = self
|
||||||
|
.primary
|
||||||
|
.get_public_domain()
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
|
Ok(primary_domain)
|
||||||
|
*/
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
async fn get_internal_domain(&self) -> Result<Option<String>, String> {
|
async fn get_internal_domain(&self) -> Result<Option<String>, String> {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use harmony_types::id::Id;
|
use harmony_types::id::Id;
|
||||||
|
use k8s_openapi::api::scheduling::v1::PriorityClass;
|
||||||
use k8s_openapi::api::{
|
use k8s_openapi::api::{
|
||||||
apps::v1::{DaemonSet, DaemonSetSpec},
|
apps::v1::{DaemonSet, DaemonSetSpec},
|
||||||
core::v1::{
|
core::v1::{
|
||||||
@@ -144,6 +145,19 @@ impl<T: Topology + K8sclient> Interpret<T> for NodeHealthInterpret {
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// PriorityClass
|
||||||
|
let priority_class_name = "node-healthcheck-critical".to_string();
|
||||||
|
let priority_class = PriorityClass {
|
||||||
|
metadata: ObjectMeta {
|
||||||
|
name: Some(priority_class_name.clone()),
|
||||||
|
..ObjectMeta::default()
|
||||||
|
},
|
||||||
|
value: 1000000000,
|
||||||
|
global_default: Some(false),
|
||||||
|
preemption_policy: Some("PreemptLowerPriority".to_string()),
|
||||||
|
description: Some("Highest priority for node health check daemonset - can preempt lower priority pods".to_string()),
|
||||||
|
};
|
||||||
|
|
||||||
// DaemonSet
|
// DaemonSet
|
||||||
let mut daemonset_labels = BTreeMap::new();
|
let mut daemonset_labels = BTreeMap::new();
|
||||||
daemonset_labels.insert("app".to_string(), "node-healthcheck".to_string());
|
daemonset_labels.insert("app".to_string(), "node-healthcheck".to_string());
|
||||||
@@ -168,6 +182,7 @@ impl<T: Topology + K8sclient> Interpret<T> for NodeHealthInterpret {
|
|||||||
spec: Some(PodSpec {
|
spec: Some(PodSpec {
|
||||||
service_account_name: Some(service_account_name.clone()),
|
service_account_name: Some(service_account_name.clone()),
|
||||||
host_network: Some(true),
|
host_network: Some(true),
|
||||||
|
priority_class_name: Some(priority_class_name),
|
||||||
tolerations: Some(vec![Toleration {
|
tolerations: Some(vec![Toleration {
|
||||||
operator: Some("Exists".to_string()),
|
operator: Some("Exists".to_string()),
|
||||||
..Toleration::default()
|
..Toleration::default()
|
||||||
@@ -182,6 +197,7 @@ impl<T: Topology + K8sclient> Interpret<T> for NodeHealthInterpret {
|
|||||||
name: "NODE_NAME".to_string(),
|
name: "NODE_NAME".to_string(),
|
||||||
value_from: Some(EnvVarSource {
|
value_from: Some(EnvVarSource {
|
||||||
field_ref: Some(ObjectFieldSelector {
|
field_ref: Some(ObjectFieldSelector {
|
||||||
|
api_version: Some("v1".to_string()),
|
||||||
field_path: "spec.nodeName".to_string(),
|
field_path: "spec.nodeName".to_string(),
|
||||||
..ObjectFieldSelector::default()
|
..ObjectFieldSelector::default()
|
||||||
}),
|
}),
|
||||||
@@ -233,6 +249,9 @@ impl<T: Topology + K8sclient> Interpret<T> for NodeHealthInterpret {
|
|||||||
K8sResourceScore::single(cluster_role_binding, None)
|
K8sResourceScore::single(cluster_role_binding, None)
|
||||||
.interpret(inventory, topology)
|
.interpret(inventory, topology)
|
||||||
.await?;
|
.await?;
|
||||||
|
K8sResourceScore::single(priority_class, None)
|
||||||
|
.interpret(inventory, topology)
|
||||||
|
.await?;
|
||||||
K8sResourceScore::single(daemon_set, Some(namespace_name.clone()))
|
K8sResourceScore::single(daemon_set, Some(namespace_name.clone()))
|
||||||
.interpret(inventory, topology)
|
.interpret(inventory, topology)
|
||||||
.await?;
|
.await?;
|
||||||
|
|||||||
@@ -41,6 +41,7 @@ impl OKDBootstrapLoadBalancerScore {
|
|||||||
backend_servers: Self::topology_to_backend_server(topology, 6443),
|
backend_servers: Self::topology_to_backend_server(topology, 6443),
|
||||||
listening_port: SocketAddr::new(private_ip, 6443),
|
listening_port: SocketAddr::new(private_ip, 6443),
|
||||||
health_check: Some(HealthCheck::HTTP(
|
health_check: Some(HealthCheck::HTTP(
|
||||||
|
None,
|
||||||
"/readyz".to_string(),
|
"/readyz".to_string(),
|
||||||
HttpMethod::GET,
|
HttpMethod::GET,
|
||||||
HttpStatusCode::Success2xx,
|
HttpStatusCode::Success2xx,
|
||||||
|
|||||||
@@ -58,12 +58,12 @@ impl OKDLoadBalancerScore {
|
|||||||
LoadBalancerService {
|
LoadBalancerService {
|
||||||
backend_servers: Self::nodes_to_backend_server(topology, 80),
|
backend_servers: Self::nodes_to_backend_server(topology, 80),
|
||||||
listening_port: SocketAddr::new(public_ip, 80),
|
listening_port: SocketAddr::new(public_ip, 80),
|
||||||
health_check: Some(HealthCheck::TCP(None)),
|
health_check: None,
|
||||||
},
|
},
|
||||||
LoadBalancerService {
|
LoadBalancerService {
|
||||||
backend_servers: Self::nodes_to_backend_server(topology, 443),
|
backend_servers: Self::nodes_to_backend_server(topology, 443),
|
||||||
listening_port: SocketAddr::new(public_ip, 443),
|
listening_port: SocketAddr::new(public_ip, 443),
|
||||||
health_check: Some(HealthCheck::TCP(None)),
|
health_check: None,
|
||||||
},
|
},
|
||||||
];
|
];
|
||||||
|
|
||||||
@@ -71,12 +71,24 @@ impl OKDLoadBalancerScore {
|
|||||||
LoadBalancerService {
|
LoadBalancerService {
|
||||||
backend_servers: Self::nodes_to_backend_server(topology, 80),
|
backend_servers: Self::nodes_to_backend_server(topology, 80),
|
||||||
listening_port: SocketAddr::new(public_ip, 80),
|
listening_port: SocketAddr::new(public_ip, 80),
|
||||||
health_check: Some(HealthCheck::TCP(None)),
|
health_check: Some(HealthCheck::HTTP(
|
||||||
|
Some(25001),
|
||||||
|
"/health?check=okd_router_1936,node_ready".to_string(),
|
||||||
|
HttpMethod::GET,
|
||||||
|
HttpStatusCode::Success2xx,
|
||||||
|
SSL::Default,
|
||||||
|
)),
|
||||||
},
|
},
|
||||||
LoadBalancerService {
|
LoadBalancerService {
|
||||||
backend_servers: Self::nodes_to_backend_server(topology, 443),
|
backend_servers: Self::nodes_to_backend_server(topology, 443),
|
||||||
listening_port: SocketAddr::new(public_ip, 443),
|
listening_port: SocketAddr::new(public_ip, 443),
|
||||||
health_check: Some(HealthCheck::TCP(None)),
|
health_check: Some(HealthCheck::HTTP(
|
||||||
|
Some(25001),
|
||||||
|
"/health?check=okd_router_1936,node_ready".to_string(),
|
||||||
|
HttpMethod::GET,
|
||||||
|
HttpStatusCode::Success2xx,
|
||||||
|
SSL::Default,
|
||||||
|
)),
|
||||||
},
|
},
|
||||||
LoadBalancerService {
|
LoadBalancerService {
|
||||||
backend_servers: Self::control_plane_to_backend_server(topology, 22623),
|
backend_servers: Self::control_plane_to_backend_server(topology, 22623),
|
||||||
@@ -87,6 +99,7 @@ impl OKDLoadBalancerScore {
|
|||||||
backend_servers: Self::control_plane_to_backend_server(topology, 6443),
|
backend_servers: Self::control_plane_to_backend_server(topology, 6443),
|
||||||
listening_port: SocketAddr::new(public_ip, 6443),
|
listening_port: SocketAddr::new(public_ip, 6443),
|
||||||
health_check: Some(HealthCheck::HTTP(
|
health_check: Some(HealthCheck::HTTP(
|
||||||
|
None,
|
||||||
"/readyz".to_string(),
|
"/readyz".to_string(),
|
||||||
HttpMethod::GET,
|
HttpMethod::GET,
|
||||||
HttpStatusCode::Success2xx,
|
HttpStatusCode::Success2xx,
|
||||||
@@ -298,28 +311,6 @@ mod tests {
|
|||||||
assert_eq!(private_service_22623.backend_servers.len(), 3);
|
assert_eq!(private_service_22623.backend_servers.len(), 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_private_service_port_6443_only_control_plane() {
|
|
||||||
let topology = create_test_topology();
|
|
||||||
let score = OKDLoadBalancerScore::new(&topology);
|
|
||||||
|
|
||||||
let private_service_6443 = score
|
|
||||||
.load_balancer_score
|
|
||||||
.private_services
|
|
||||||
.iter()
|
|
||||||
.find(|s| s.listening_port.port() == 6443)
|
|
||||||
.expect("Private service on port 6443 not found");
|
|
||||||
|
|
||||||
assert_eq!(private_service_6443.backend_servers.len(), 3);
|
|
||||||
assert!(
|
|
||||||
matches!(
|
|
||||||
private_service_6443.health_check,
|
|
||||||
Some(HealthCheck::HTTP(_, _, _, _))
|
|
||||||
),
|
|
||||||
"Expected HTTP health check for port 6443"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_all_backend_servers_have_correct_port() {
|
fn test_all_backend_servers_have_correct_port() {
|
||||||
let topology = create_test_topology();
|
let topology = create_test_topology();
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ pub struct PostgreSQLConfig {
|
|||||||
/// settings incompatible with the default CNPG behavior.
|
/// settings incompatible with the default CNPG behavior.
|
||||||
pub namespace: String,
|
pub namespace: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PostgreSQLConfig {
|
impl PostgreSQLConfig {
|
||||||
pub fn with_namespace(&self, namespace: &str) -> PostgreSQLConfig {
|
pub fn with_namespace(&self, namespace: &str) -> PostgreSQLConfig {
|
||||||
let mut new = self.clone();
|
let mut new = self.clone();
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::BTreeMap;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use kube::{CustomResource, api::ObjectMeta};
|
use kube::{CustomResource, api::ObjectMeta};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@@ -19,6 +20,10 @@ pub struct ClusterSpec {
|
|||||||
pub image_name: Option<String>,
|
pub image_name: Option<String>,
|
||||||
pub storage: Storage,
|
pub storage: Storage,
|
||||||
pub bootstrap: Bootstrap,
|
pub bootstrap: Bootstrap,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub external_clusters: Option<Vec<ExternalCluster>>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub replica: Option<ReplicaSpec>,
|
||||||
/// This must be set to None if you want cnpg to generate a superuser secret
|
/// This must be set to None if you want cnpg to generate a superuser secret
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub superuser_secret: Option<BTreeMap<String, String>>,
|
pub superuser_secret: Option<BTreeMap<String, String>>,
|
||||||
@@ -41,6 +46,8 @@ impl Default for ClusterSpec {
|
|||||||
image_name: None,
|
image_name: None,
|
||||||
storage: Storage::default(),
|
storage: Storage::default(),
|
||||||
bootstrap: Bootstrap::default(),
|
bootstrap: Bootstrap::default(),
|
||||||
|
external_clusters: None,
|
||||||
|
replica: None,
|
||||||
superuser_secret: None,
|
superuser_secret: None,
|
||||||
enable_superuser_access: false,
|
enable_superuser_access: false,
|
||||||
}
|
}
|
||||||
@@ -56,7 +63,13 @@ pub struct Storage {
|
|||||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct Bootstrap {
|
pub struct Bootstrap {
|
||||||
pub initdb: Initdb,
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub initdb: Option<Initdb>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub recovery: Option<Recovery>,
|
||||||
|
#[serde(rename = "pg_basebackup")]
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub pg_basebackup: Option<PgBaseBackup>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||||
@@ -65,3 +78,50 @@ pub struct Initdb {
|
|||||||
pub database: String,
|
pub database: String,
|
||||||
pub owner: String,
|
pub owner: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct Recovery {
|
||||||
|
pub source: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||||
|
pub struct PgBaseBackup {
|
||||||
|
pub source: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct ExternalCluster {
|
||||||
|
pub name: String,
|
||||||
|
pub connection_parameters: HashMap<String, String>,
|
||||||
|
pub ssl_key: Option<SecretKeySelector>,
|
||||||
|
pub ssl_cert: Option<SecretKeySelector>,
|
||||||
|
pub ssl_root_cert: Option<SecretKeySelector>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct ConnectionParameters {
|
||||||
|
pub host: String,
|
||||||
|
pub user: String,
|
||||||
|
pub dbname: String,
|
||||||
|
pub sslmode: String,
|
||||||
|
pub sslnegotiation: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct ReplicaSpec {
|
||||||
|
pub enabled: bool,
|
||||||
|
pub source: String,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub primary: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct SecretKeySelector {
|
||||||
|
pub name: String,
|
||||||
|
pub key: String,
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ use log::debug;
|
|||||||
use log::info;
|
use log::info;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use crate::interpret::InterpretError;
|
||||||
|
use crate::topology::TlsRoute;
|
||||||
use crate::topology::TlsRouter;
|
use crate::topology::TlsRouter;
|
||||||
use crate::{
|
use crate::{
|
||||||
modules::postgresql::capability::{
|
modules::postgresql::capability::{
|
||||||
@@ -49,8 +51,18 @@ impl<T: PostgreSQL + TlsRouter> PostgreSQL for FailoverTopology<T> {
|
|||||||
// TODO we should be getting the public endpoint for a service by calling a method on
|
// TODO we should be getting the public endpoint for a service by calling a method on
|
||||||
// TlsRouter capability.
|
// TlsRouter capability.
|
||||||
// Something along the lines of `TlsRouter::get_hostname_for_service(...).await?;`
|
// Something along the lines of `TlsRouter::get_hostname_for_service(...).await?;`
|
||||||
|
let host = format!(
|
||||||
|
"{}.{}.{}",
|
||||||
|
config.cluster_name,
|
||||||
|
config.namespace,
|
||||||
|
self.primary
|
||||||
|
.get_public_domain()
|
||||||
|
.await
|
||||||
|
.expect("failed to retrieve public domain")
|
||||||
|
.to_string()
|
||||||
|
);
|
||||||
let endpoint = PostgreSQLEndpoint {
|
let endpoint = PostgreSQLEndpoint {
|
||||||
host: "postgrestest.sto1.nationtech.io".to_string(),
|
host,
|
||||||
port: self.primary.get_router_port().await,
|
port: self.primary.get_router_port().await,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -59,6 +71,46 @@ impl<T: PostgreSQL + TlsRouter> PostgreSQL for FailoverTopology<T> {
|
|||||||
endpoint.host, endpoint.port
|
endpoint.host, endpoint.port
|
||||||
);
|
);
|
||||||
|
|
||||||
|
info!("installing primary postgres route");
|
||||||
|
let prim_hostname = format!(
|
||||||
|
"{}.{}.{}",
|
||||||
|
config.cluster_name,
|
||||||
|
config.namespace,
|
||||||
|
self.primary.get_public_domain().await?
|
||||||
|
);
|
||||||
|
let rw_backend = format!("{}-rw", config.cluster_name);
|
||||||
|
let tls_route = TlsRoute {
|
||||||
|
hostname: prim_hostname,
|
||||||
|
backend: rw_backend,
|
||||||
|
target_port: 5432,
|
||||||
|
namespace: config.namespace.clone(),
|
||||||
|
};
|
||||||
|
// Expose RW publicly via TLS passthrough
|
||||||
|
self.primary
|
||||||
|
.install_route(tls_route.clone())
|
||||||
|
.await
|
||||||
|
.map_err(|e| InterpretError::new(e))?;
|
||||||
|
|
||||||
|
info!("installing replica postgres route");
|
||||||
|
let rep_hostname = format!(
|
||||||
|
"{}.{}.{}",
|
||||||
|
config.cluster_name,
|
||||||
|
config.namespace,
|
||||||
|
self.replica.get_public_domain().await?
|
||||||
|
);
|
||||||
|
let rw_backend = format!("{}-rw", config.cluster_name);
|
||||||
|
let tls_route = TlsRoute {
|
||||||
|
hostname: rep_hostname,
|
||||||
|
backend: rw_backend,
|
||||||
|
target_port: 5432,
|
||||||
|
namespace: config.namespace.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Expose RW publicly via TLS passthrough
|
||||||
|
self.replica
|
||||||
|
.install_route(tls_route.clone())
|
||||||
|
.await
|
||||||
|
.map_err(|e| InterpretError::new(e))?;
|
||||||
info!("Configuring replica connection parameters and bootstrap");
|
info!("Configuring replica connection parameters and bootstrap");
|
||||||
|
|
||||||
let mut connection_parameters = HashMap::new();
|
let mut connection_parameters = HashMap::new();
|
||||||
|
|||||||
@@ -1,14 +1,21 @@
|
|||||||
use std::collections::BTreeMap;
|
use crate::data::Version;
|
||||||
|
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
|
||||||
|
use crate::inventory::Inventory;
|
||||||
|
|
||||||
use serde::Serialize;
|
|
||||||
|
|
||||||
use crate::interpret::Interpret;
|
|
||||||
use crate::modules::k8s::resource::K8sResourceScore;
|
use crate::modules::k8s::resource::K8sResourceScore;
|
||||||
use crate::modules::postgresql::capability::PostgreSQLConfig;
|
use crate::modules::postgresql::capability::PostgreSQLConfig;
|
||||||
use crate::modules::postgresql::cnpg::{Bootstrap, Cluster, ClusterSpec, Initdb, Storage};
|
use crate::modules::postgresql::cnpg::{
|
||||||
|
Bootstrap, Cluster, ClusterSpec, ExternalCluster, Initdb, PgBaseBackup, ReplicaSpec,
|
||||||
|
SecretKeySelector, Storage,
|
||||||
|
};
|
||||||
use crate::score::Score;
|
use crate::score::Score;
|
||||||
use crate::topology::{K8sclient, Topology};
|
use crate::topology::{K8sclient, Topology};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use harmony_types::id::Id;
|
||||||
|
use k8s_openapi::ByteString;
|
||||||
|
use k8s_openapi::api::core::v1::Secret;
|
||||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
/// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG.
|
/// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG.
|
||||||
///
|
///
|
||||||
@@ -51,6 +58,30 @@ impl K8sPostgreSQLScore {
|
|||||||
|
|
||||||
impl<T: Topology + K8sclient> Score<T> for K8sPostgreSQLScore {
|
impl<T: Topology + K8sclient> Score<T> for K8sPostgreSQLScore {
|
||||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||||
|
Box::new(K8sPostgreSQLInterpret {
|
||||||
|
config: self.config.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn name(&self) -> String {
|
||||||
|
format!("PostgreSQLScore({})", self.config.namespace)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct K8sPostgreSQLInterpret {
|
||||||
|
config: PostgreSQLConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: Topology + K8sclient> Interpret<T> for K8sPostgreSQLInterpret {
|
||||||
|
async fn execute(
|
||||||
|
&self,
|
||||||
|
inventory: &Inventory,
|
||||||
|
topology: &T,
|
||||||
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
match &self.config.role {
|
||||||
|
super::capability::PostgreSQLClusterRole::Primary => {
|
||||||
let metadata = ObjectMeta {
|
let metadata = ObjectMeta {
|
||||||
name: Some(self.config.cluster_name.clone()),
|
name: Some(self.config.cluster_name.clone()),
|
||||||
namespace: Some(self.config.namespace.clone()),
|
namespace: Some(self.config.namespace.clone()),
|
||||||
@@ -63,25 +94,148 @@ impl<T: Topology + K8sclient> Score<T> for K8sPostgreSQLScore {
|
|||||||
size: self.config.storage_size.to_string(),
|
size: self.config.storage_size.to_string(),
|
||||||
},
|
},
|
||||||
bootstrap: Bootstrap {
|
bootstrap: Bootstrap {
|
||||||
initdb: Initdb {
|
initdb: Some(Initdb {
|
||||||
database: "app".to_string(),
|
database: "app".to_string(),
|
||||||
owner: "app".to_string(),
|
owner: "app".to_string(),
|
||||||
|
}),
|
||||||
|
recovery: None,
|
||||||
|
pg_basebackup: None,
|
||||||
},
|
},
|
||||||
},
|
|
||||||
// superuser_secret: Some(BTreeMap::from([(
|
|
||||||
// "name".to_string(),
|
|
||||||
// format!("{}-superuser", self.config.cluster_name.clone()),
|
|
||||||
// )])),
|
|
||||||
enable_superuser_access: true,
|
enable_superuser_access: true,
|
||||||
..ClusterSpec::default()
|
..ClusterSpec::default()
|
||||||
};
|
};
|
||||||
|
let cluster = Cluster { metadata, spec };
|
||||||
|
|
||||||
|
Ok(
|
||||||
|
K8sResourceScore::single(cluster, Some(self.config.namespace.clone()))
|
||||||
|
.create_interpret()
|
||||||
|
.execute(inventory, topology)
|
||||||
|
.await?,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
super::capability::PostgreSQLClusterRole::Replica(replica_config) => {
|
||||||
|
let metadata = ObjectMeta {
|
||||||
|
name: Some("streaming-replica-certs".to_string()),
|
||||||
|
namespace: Some(self.config.namespace.clone()),
|
||||||
|
..ObjectMeta::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
// The data must be base64-encoded. If you already have PEM strings in your config, encode them:
|
||||||
|
let mut data = std::collections::BTreeMap::new();
|
||||||
|
data.insert(
|
||||||
|
"tls.key".to_string(),
|
||||||
|
ByteString(
|
||||||
|
replica_config
|
||||||
|
.replication_certs
|
||||||
|
.streaming_replica_key_pem
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec(),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
data.insert(
|
||||||
|
"tls.crt".to_string(),
|
||||||
|
ByteString(
|
||||||
|
replica_config
|
||||||
|
.replication_certs
|
||||||
|
.streaming_replica_cert_pem
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec(),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
data.insert(
|
||||||
|
"ca.crt".to_string(),
|
||||||
|
ByteString(
|
||||||
|
replica_config
|
||||||
|
.replication_certs
|
||||||
|
.ca_cert_pem
|
||||||
|
.as_bytes()
|
||||||
|
.to_vec(),
|
||||||
|
),
|
||||||
|
);
|
||||||
|
|
||||||
|
let secret = Secret {
|
||||||
|
metadata,
|
||||||
|
data: Some(data),
|
||||||
|
string_data: None, // You could use string_data if you prefer raw strings
|
||||||
|
type_: Some("Opaque".to_string()),
|
||||||
|
..Secret::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
K8sResourceScore::single(secret, Some(self.config.namespace.clone()))
|
||||||
|
.create_interpret()
|
||||||
|
.execute(inventory, topology)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let metadata = ObjectMeta {
|
||||||
|
name: Some(self.config.cluster_name.clone()),
|
||||||
|
namespace: Some(self.config.namespace.clone()),
|
||||||
|
..ObjectMeta::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let spec = ClusterSpec {
|
||||||
|
instances: self.config.instances,
|
||||||
|
storage: Storage {
|
||||||
|
size: self.config.storage_size.to_string(),
|
||||||
|
},
|
||||||
|
bootstrap: Bootstrap {
|
||||||
|
initdb: None,
|
||||||
|
recovery: None,
|
||||||
|
pg_basebackup: Some(PgBaseBackup {
|
||||||
|
source: replica_config.primary_cluster_name.clone(),
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
external_clusters: Some(vec![ExternalCluster {
|
||||||
|
name: replica_config.primary_cluster_name.clone(),
|
||||||
|
connection_parameters: replica_config
|
||||||
|
.external_cluster
|
||||||
|
.connection_parameters
|
||||||
|
.clone(),
|
||||||
|
ssl_key: Some(SecretKeySelector {
|
||||||
|
name: "streaming-replica-certs".to_string(),
|
||||||
|
key: "tls.key".to_string(),
|
||||||
|
}),
|
||||||
|
ssl_cert: Some(SecretKeySelector {
|
||||||
|
name: "streaming-replica-certs".to_string(),
|
||||||
|
key: "tls.crt".to_string(),
|
||||||
|
}),
|
||||||
|
ssl_root_cert: Some(SecretKeySelector {
|
||||||
|
name: "streaming-replica-certs".to_string(),
|
||||||
|
key: "ca.crt".to_string(),
|
||||||
|
}),
|
||||||
|
}]),
|
||||||
|
replica: Some(ReplicaSpec {
|
||||||
|
enabled: true,
|
||||||
|
source: replica_config.primary_cluster_name.clone(),
|
||||||
|
primary: None,
|
||||||
|
}),
|
||||||
|
..ClusterSpec::default()
|
||||||
|
};
|
||||||
|
|
||||||
let cluster = Cluster { metadata, spec };
|
let cluster = Cluster { metadata, spec };
|
||||||
|
|
||||||
K8sResourceScore::single(cluster, Some(self.config.namespace.clone())).create_interpret()
|
Ok(
|
||||||
|
K8sResourceScore::single(cluster, Some(self.config.namespace.clone()))
|
||||||
|
.create_interpret()
|
||||||
|
.execute(inventory, topology)
|
||||||
|
.await?,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn name(&self) -> String {
|
fn get_name(&self) -> InterpretName {
|
||||||
format!("PostgreSQLScore({})", self.config.namespace)
|
InterpretName::Custom("K8sPostgreSQLInterpret")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_version(&self) -> Version {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_status(&self) -> InterpretStatus {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_children(&self) -> Vec<Id> {
|
||||||
|
todo!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,46 +18,31 @@ use crate::topology::Topology;
|
|||||||
/// # Usage
|
/// # Usage
|
||||||
/// ```
|
/// ```
|
||||||
/// use harmony::modules::postgresql::PublicPostgreSQLScore;
|
/// use harmony::modules::postgresql::PublicPostgreSQLScore;
|
||||||
/// let score = PublicPostgreSQLScore::new("harmony", "pg-rw.example.com");
|
/// let score = PublicPostgreSQLScore::new("harmony");
|
||||||
/// ```
|
/// ```
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
pub struct PublicPostgreSQLScore {
|
pub struct PublicPostgreSQLScore {
|
||||||
/// Inner non-public Postgres cluster config.
|
/// Inner non-public Postgres cluster config.
|
||||||
pub config: PostgreSQLConfig,
|
pub config: PostgreSQLConfig,
|
||||||
/// Public hostname for RW TLS passthrough (port 443 → cluster-rw:5432).
|
|
||||||
pub hostname: String,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PublicPostgreSQLScore {
|
impl PublicPostgreSQLScore {
|
||||||
pub fn new(namespace: &str, hostname: &str) -> Self {
|
pub fn new(namespace: &str) -> Self {
|
||||||
Self {
|
Self {
|
||||||
config: PostgreSQLConfig::default().with_namespace(namespace),
|
config: PostgreSQLConfig::default().with_namespace(namespace),
|
||||||
hostname: hostname.to_string(),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Topology + PostgreSQL + TlsRouter> Score<T> for PublicPostgreSQLScore {
|
impl<T: Topology + PostgreSQL + TlsRouter> Score<T> for PublicPostgreSQLScore {
|
||||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||||
let rw_backend = format!("{}-rw", self.config.cluster_name);
|
|
||||||
let tls_route = TlsRoute {
|
|
||||||
namespace: self.config.namespace.clone(),
|
|
||||||
hostname: self.hostname.clone(),
|
|
||||||
backend: rw_backend,
|
|
||||||
target_port: 5432,
|
|
||||||
};
|
|
||||||
|
|
||||||
Box::new(PublicPostgreSQLInterpret {
|
Box::new(PublicPostgreSQLInterpret {
|
||||||
config: self.config.clone(),
|
config: self.config.clone(),
|
||||||
tls_route,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn name(&self) -> String {
|
fn name(&self) -> String {
|
||||||
format!(
|
format!("PublicPostgreSQLScore({})", self.config.namespace)
|
||||||
"PublicPostgreSQLScore({}:{})",
|
|
||||||
self.config.namespace, self.hostname
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -65,7 +50,6 @@ impl<T: Topology + PostgreSQL + TlsRouter> Score<T> for PublicPostgreSQLScore {
|
|||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
struct PublicPostgreSQLInterpret {
|
struct PublicPostgreSQLInterpret {
|
||||||
config: PostgreSQLConfig,
|
config: PostgreSQLConfig,
|
||||||
tls_route: TlsRoute,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -76,15 +60,28 @@ impl<T: Topology + PostgreSQL + TlsRouter> Interpret<T> for PublicPostgreSQLInte
|
|||||||
.await
|
.await
|
||||||
.map_err(|e| InterpretError::new(e))?;
|
.map_err(|e| InterpretError::new(e))?;
|
||||||
|
|
||||||
|
let hostname = format!(
|
||||||
|
"{}.{}.{}",
|
||||||
|
self.config.cluster_name,
|
||||||
|
self.config.namespace,
|
||||||
|
topo.get_public_domain().await?
|
||||||
|
);
|
||||||
|
let rw_backend = format!("{}-rw", self.config.cluster_name);
|
||||||
|
let tls_route = TlsRoute {
|
||||||
|
hostname,
|
||||||
|
backend: rw_backend,
|
||||||
|
target_port: 5432,
|
||||||
|
namespace: self.config.namespace.clone(),
|
||||||
|
};
|
||||||
// Expose RW publicly via TLS passthrough
|
// Expose RW publicly via TLS passthrough
|
||||||
topo.install_route(self.tls_route.clone())
|
topo.install_route(tls_route.clone())
|
||||||
.await
|
.await
|
||||||
.map_err(|e| InterpretError::new(e))?;
|
.map_err(|e| InterpretError::new(e))?;
|
||||||
|
|
||||||
Ok(Outcome::success(format!(
|
Ok(Outcome::success(format!(
|
||||||
"Public CNPG cluster '{}' deployed with TLS passthrough route '{}'",
|
"Public CNPG cluster '{}' deployed with TLS passthrough route '{}'",
|
||||||
self.config.cluster_name.clone(),
|
self.config.cluster_name.clone(),
|
||||||
self.tls_route.hostname
|
tls_route.hostname
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -344,7 +344,7 @@ pub struct StaticMap {
|
|||||||
pub mac: String,
|
pub mac: String,
|
||||||
pub ipaddr: String,
|
pub ipaddr: String,
|
||||||
pub cid: Option<MaybeString>,
|
pub cid: Option<MaybeString>,
|
||||||
pub hostname: String,
|
pub hostname: Option<String>,
|
||||||
pub descr: Option<MaybeString>,
|
pub descr: Option<MaybeString>,
|
||||||
pub winsserver: MaybeString,
|
pub winsserver: MaybeString,
|
||||||
pub dnsserver: MaybeString,
|
pub dnsserver: MaybeString,
|
||||||
@@ -383,24 +383,24 @@ pub struct Outbound {
|
|||||||
|
|
||||||
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
||||||
pub struct NatRule {
|
pub struct NatRule {
|
||||||
pub protocol: String,
|
pub protocol: Option<String>,
|
||||||
pub interface: String,
|
pub interface: Option<String>,
|
||||||
pub category: MaybeString,
|
pub category: Option<MaybeString>,
|
||||||
pub ipprotocol: String,
|
pub ipprotocol: Option<String>,
|
||||||
pub descr: MaybeString,
|
pub descr: Option<MaybeString>,
|
||||||
pub tag: MaybeString,
|
pub tag: Option<MaybeString>,
|
||||||
pub tagged: Option<MaybeString>,
|
pub tagged: Option<MaybeString>,
|
||||||
pub poolopts: PoolOpts,
|
pub poolopts: Option<PoolOpts>,
|
||||||
#[yaserde(rename = "associated-rule-id")]
|
#[yaserde(rename = "associated-rule-id")]
|
||||||
pub associated_rule_id: Option<MaybeString>,
|
pub associated_rule_id: Option<MaybeString>,
|
||||||
pub disabled: Option<u8>,
|
pub disabled: Option<u8>,
|
||||||
pub target: String,
|
pub target: Option<String>,
|
||||||
#[yaserde(rename = "local-port")]
|
#[yaserde(rename = "local-port")]
|
||||||
pub local_port: i32,
|
pub local_port: Option<i32>,
|
||||||
pub source: Source,
|
pub source: Option<Source>,
|
||||||
pub destination: Destination,
|
pub destination: Option<Destination>,
|
||||||
pub updated: Updated,
|
pub updated: Option<Updated>,
|
||||||
pub created: Created,
|
pub created: Option<Created>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
||||||
@@ -1545,7 +1545,7 @@ pub struct Vlans {
|
|||||||
|
|
||||||
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
||||||
pub struct Bridges {
|
pub struct Bridges {
|
||||||
pub bridged: Option<MaybeString>,
|
pub bridged: Option<RawXml>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ impl<'a> DhcpConfigLegacyISC<'a> {
|
|||||||
hostname: &str,
|
hostname: &str,
|
||||||
) -> Result<(), DhcpError> {
|
) -> Result<(), DhcpError> {
|
||||||
let mac = mac.to_string();
|
let mac = mac.to_string();
|
||||||
let hostname = hostname.to_string();
|
let hostname = Some(hostname.to_string());
|
||||||
let lan_dhcpd = self.get_lan_dhcpd();
|
let lan_dhcpd = self.get_lan_dhcpd();
|
||||||
let existing_mappings: &mut Vec<StaticMap> = &mut lan_dhcpd.staticmaps;
|
let existing_mappings: &mut Vec<StaticMap> = &mut lan_dhcpd.staticmaps;
|
||||||
|
|
||||||
@@ -121,7 +121,7 @@ impl<'a> DhcpConfigLegacyISC<'a> {
|
|||||||
.map(|entry| StaticMap {
|
.map(|entry| StaticMap {
|
||||||
mac: entry["mac"].as_str().unwrap_or_default().to_string(),
|
mac: entry["mac"].as_str().unwrap_or_default().to_string(),
|
||||||
ipaddr: entry["ipaddr"].as_str().unwrap_or_default().to_string(),
|
ipaddr: entry["ipaddr"].as_str().unwrap_or_default().to_string(),
|
||||||
hostname: entry["hostname"].as_str().unwrap_or_default().to_string(),
|
hostname: Some(entry["hostname"].as_str().unwrap_or_default().to_string()),
|
||||||
descr: entry["descr"].as_str().map(MaybeString::from),
|
descr: entry["descr"].as_str().map(MaybeString::from),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -213,7 +213,7 @@ impl<'a> DhcpConfigDnsMasq<'a> {
|
|||||||
.map(|entry| StaticMap {
|
.map(|entry| StaticMap {
|
||||||
mac: entry["mac"].as_str().unwrap_or_default().to_string(),
|
mac: entry["mac"].as_str().unwrap_or_default().to_string(),
|
||||||
ipaddr: entry["ipaddr"].as_str().unwrap_or_default().to_string(),
|
ipaddr: entry["ipaddr"].as_str().unwrap_or_default().to_string(),
|
||||||
hostname: entry["hostname"].as_str().unwrap_or_default().to_string(),
|
hostname: Some(entry["hostname"].as_str().unwrap_or_default().to_string()),
|
||||||
descr: entry["descr"].as_str().map(MaybeString::from),
|
descr: entry["descr"].as_str().map(MaybeString::from),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user