Compare commits
16 Commits
feat/clust
...
example/vl
| Author | SHA1 | Date | |
|---|---|---|---|
| 304490977c | |||
| 8499f4d1b7 | |||
| aa07f4c8ad | |||
| 77bb138497 | |||
| a16879b1b6 | |||
| f57e6f5957 | |||
| 7605d05de3 | |||
| b244127843 | |||
| 67c3265286 | |||
| d8ab9d52a4 | |||
| 2cb7aeefc0 | |||
| 16016febcf | |||
| e709de531d | |||
| 6ab0f3a6ab | |||
| 724ab0b888 | |||
| 8b6ce8d069 |
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -7001,6 +7001,18 @@ version = "0.9.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
|
||||
|
||||
[[package]]
|
||||
name = "vllm"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"env_logger",
|
||||
"harmony",
|
||||
"harmony_cli",
|
||||
"k8s-openapi",
|
||||
"log",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wait-timeout"
|
||||
version = "0.2.1"
|
||||
|
||||
@@ -18,7 +18,7 @@ members = [
|
||||
"adr/agent_discovery/mdns",
|
||||
"brocade",
|
||||
"harmony_agent",
|
||||
"harmony_agent/deploy", "harmony_node_readiness", "harmony-k8s",
|
||||
"harmony_agent/deploy", "harmony_node_readiness", "harmony-k8s", "examples/vllm",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
|
||||
@@ -14,7 +14,6 @@ async fn main() {
|
||||
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
|
||||
// "default" namespace, 1 instance, 1Gi storage
|
||||
},
|
||||
hostname: "postgrestest.sto1.nationtech.io".to_string(),
|
||||
};
|
||||
|
||||
harmony_cli::run(
|
||||
|
||||
@@ -15,7 +15,6 @@ async fn main() {
|
||||
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
|
||||
// 1 instance, 1Gi storage
|
||||
},
|
||||
hostname: "postgrestest.sto1.nationtech.io".to_string(),
|
||||
};
|
||||
|
||||
let test_connection = PostgreSQLConnectionScore {
|
||||
|
||||
15
examples/vllm/Cargo.toml
Normal file
15
examples/vllm/Cargo.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
[package]
|
||||
name = "vllm"
|
||||
edition = "2024"
|
||||
version.workspace = true
|
||||
readme.workspace = true
|
||||
license.workspace = true
|
||||
publish = false
|
||||
|
||||
[dependencies]
|
||||
harmony = { path = "../../harmony" }
|
||||
harmony_cli = { path = "../../harmony_cli" }
|
||||
k8s-openapi = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
log = { workspace = true }
|
||||
env_logger = { workspace = true }
|
||||
523
examples/vllm/src/main.rs
Normal file
523
examples/vllm/src/main.rs
Normal file
@@ -0,0 +1,523 @@
|
||||
//! vLLM Deployment Example for Qwen3.5-27B-FP8 on NVIDIA RTX 5090
|
||||
//!
|
||||
//! This example deploys vLLM serving Qwen3.5-27B with FP8 quantization,
|
||||
//! optimized for single RTX 5090 (32GB VRAM) with tool calling support.
|
||||
//!
|
||||
//! # Architecture & Memory Constraints
|
||||
//!
|
||||
//! **Model Details:**
|
||||
//! - Parameters: 27B (dense, not sparse/MoE)
|
||||
//! - Quantization: FP8 (8-bit weights)
|
||||
//! - Model size: ~27-28GB in memory
|
||||
//! - Native context: 262,144 tokens (will NOT fit in 32GB VRAM)
|
||||
//!
|
||||
//! **VRAM Budget for RTX 5090 (32GB):**
|
||||
//! - Model weights (FP8): ~27GB
|
||||
//! - Framework overhead: ~1-2GB
|
||||
//! - KV cache: ~2-3GB (for 16k context)
|
||||
//! - CUDA context: ~500MB
|
||||
//! - Temporary buffers: ~500MB
|
||||
//! - **Total: ~31-33GB** (tight fit, leaves minimal headroom)
|
||||
//!
|
||||
//! # OpenShift/OKD Requirements
|
||||
//!
|
||||
//! **SCC (Security Context Constraint) Setup:**
|
||||
//!
|
||||
//! The official vLLM container runs as root and writes to `/root/.cache/huggingface`.
|
||||
//! On OpenShift/OKD with the default restricted SCC, containers run as arbitrary UIDs
|
||||
//! and cannot write to `/root`. For testing, grant the `anyuid` SCC:
|
||||
//!
|
||||
//! ```bash
|
||||
//! # As cluster admin, grant anyuid SCC to the namespace's service account:
|
||||
//! oc adm policy add-scc-to-user anyuid -z default -n vllm-qwen
|
||||
//! ```
|
||||
//!
|
||||
//! This allows pods in the `vllm-qwen` namespace to run as root (UID 0).
|
||||
//! For production, consider building a custom vLLM image that runs as non-root.
|
||||
//!
|
||||
//! # Critical Configuration Notes
|
||||
//!
|
||||
//! 1. **GPU_MEMORY_UTILIZATION=1.0**: Maximum GPU memory allocation.
|
||||
//! NEVER decrease this for dense models - CPU offloading destroys performance
|
||||
//! (100-1000x slower) for models where every parameter is used during inference.
|
||||
//!
|
||||
//! 2. **MAX_MODEL_LEN=16384**: Conservative context length that fits in available VRAM.
|
||||
//! Agentic workflows with long tool call histories will need careful context management.
|
||||
//!
|
||||
//! 3. **--language-model-only**: Skips loading the vision encoder, saving ~1-2GB VRAM.
|
||||
//! Essential for fitting the model in 32GB VRAM.
|
||||
//!
|
||||
//! 4. **PVC Size**: 50Gi for HuggingFace cache. Qwen3.5-27B-FP8 is ~30GB.
|
||||
//!
|
||||
//! # Performance Expectations
|
||||
//!
|
||||
//! - Single token latency: ~50-100ms (no CPU offloading)
|
||||
//! - With CPU offloading: ~5-50 seconds per token (unusable for real-time inference)
|
||||
//! - Throughput: ~10-20 tokens/second (single stream, no batching)
|
||||
//!
|
||||
//! # Next Steps for Production
|
||||
//!
|
||||
//! To increase context length:
|
||||
//! 1. Monitor GPU memory: `kubectl exec -it deployment/qwen3-5-27b -- nvidia-smi dmon -s u`
|
||||
//! 2. If stable, increase MAX_MODEL_LEN (try 32768, then 65536)
|
||||
//! 3. If OOM: revert to lower value
|
||||
//!
|
||||
//! For full 262k context, consider:
|
||||
//! - Multi-GPU setup with tensor parallelism (--tensor-parallel-size 8)
|
||||
//! - Or use a smaller model (Qwen3.5-7B-FP8)
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use harmony::{
|
||||
inventory::Inventory,
|
||||
modules::{
|
||||
k8s::resource::K8sResourceScore,
|
||||
okd::{
|
||||
crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig},
|
||||
route::OKDRouteScore,
|
||||
},
|
||||
},
|
||||
score::Score,
|
||||
topology::{K8sAnywhereTopology, TlsRouter},
|
||||
};
|
||||
use k8s_openapi::{
|
||||
api::{
|
||||
apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy},
|
||||
core::v1::{
|
||||
Container, ContainerPort, EmptyDirVolumeSource, EnvVar, EnvVarSource,
|
||||
HTTPGetAction, PersistentVolumeClaim, PersistentVolumeClaimSpec,
|
||||
PersistentVolumeClaimVolumeSource, PodSpec, PodTemplateSpec, Probe,
|
||||
ResourceRequirements, Secret, SecretKeySelector, SecretVolumeSource, Service,
|
||||
ServicePort, ServiceSpec, Volume, VolumeMount, VolumeResourceRequirements,
|
||||
},
|
||||
},
|
||||
apimachinery::pkg::{
|
||||
api::resource::Quantity,
|
||||
apis::meta::v1::{LabelSelector, ObjectMeta},
|
||||
util::intstr::IntOrString,
|
||||
},
|
||||
ByteString,
|
||||
};
|
||||
use log::info;
|
||||
|
||||
const NAMESPACE: &str = "vllm-qwen";
|
||||
const MODEL_NAME: &str = "Qwen/Qwen3.5-27B-FP8";
|
||||
const DEPLOYMENT_NAME: &str = "qwen3-5-27b";
|
||||
const SERVICE_NAME: &str = DEPLOYMENT_NAME;
|
||||
const ROUTE_NAME: &str = DEPLOYMENT_NAME;
|
||||
const PVC_NAME: &str = "huggingface-cache";
|
||||
const SECRET_NAME: &str = "hf-token-secret";
|
||||
|
||||
const VLLM_IMAGE: &str = "vllm/vllm-openai:latest";
|
||||
const SERVICE_PORT: u16 = 8000;
|
||||
const TARGET_PORT: u16 = 8000;
|
||||
|
||||
/// Maximum context length for the model (in tokens).
|
||||
///
|
||||
/// **Impact on VRAM:**
|
||||
/// - Qwen3.5-27B uses per-token KV cache storage for the context window
|
||||
/// - Larger context = more KV cache memory required
|
||||
/// - Approximate KV cache per token: ~32KB for FP8 (very rough estimate)
|
||||
/// - 16k tokens ≈ 0.5-1GB KV cache
|
||||
/// - 262k tokens ≈ 8-16GB KV cache (native context length - will NOT fit in 32GB VRAM)
|
||||
///
|
||||
/// **Performance Impact:**
|
||||
/// - Context length directly impacts memory for storing conversation history
|
||||
/// - Agentic workflows with long tool call histories benefit from more context
|
||||
/// - If context > available VRAM, vLLM will OOM and fail to start
|
||||
///
|
||||
/// **Recommendations for RTX 5090 (32GB):**
|
||||
/// - Start with 16384 (conservative, should work)
|
||||
/// - If no OOM, try 32768 (better for agentic workflows)
|
||||
/// - Monitor GPU memory with `nvidia-smi` during operation
|
||||
const MAX_MODEL_LEN: i64 = 16384;
|
||||
|
||||
/// Fraction of GPU memory to allocate for the model (0.0 to 1.0).
|
||||
///
|
||||
/// **CRITICAL WARNING: This is a dense model!**
|
||||
/// Qwen3.5-27B-FP8 is NOT a sparse/mixture-of-experts model. All 27B parameters
|
||||
/// are active during inference. CPU offloading will DESTROY performance.
|
||||
///
|
||||
/// **What this parameter controls:**
|
||||
/// - Controls how much of GPU memory vLLM pre-allocates for:
|
||||
/// 1. Model weights (~27GB for FP8 quantization)
|
||||
/// 2. KV cache for context window
|
||||
/// 3. Activation buffers for inference
|
||||
/// 4. Runtime overhead
|
||||
///
|
||||
/// **VRAM Allocation Example:**
|
||||
/// - GPU: 32GB RTX 5090
|
||||
/// - GPU_MEMORY_UTILIZATION: 0.95
|
||||
/// - vLLM will try to use: 32GB * 0.95 = 30.4GB
|
||||
/// - Model weights: ~27-28GB
|
||||
/// - Remaining for KV cache + runtime: ~2-3GB
|
||||
///
|
||||
/// **If set too LOW (e.g., 0.7):**
|
||||
/// - vLLM restricts itself to 32GB * 0.7 = 22.4GB
|
||||
/// - Model weights alone need ~27GB
|
||||
/// - vLLM will OFFLOAD model weights to CPU memory
|
||||
/// - Performance: **100-1000x slower** (single token generation can take seconds instead of milliseconds)
|
||||
/// - This is catastrophic for a dense model where every layer needs all parameters
|
||||
///
|
||||
/// **If set too HIGH (e.g., 0.99):**
|
||||
/// - vLLM tries to allocate nearly all GPU memory
|
||||
/// - Risk: CUDA OOM if any other process needs GPU memory
|
||||
/// - Risk: KV cache allocation fails during inference
|
||||
/// - System instability
|
||||
///
|
||||
/// **Current Setting: 0.95**
|
||||
/// - Leaves 5% buffer (1.6GB) for CUDA overhead, system processes
|
||||
/// - Maximum allocation for model + KV cache: ~30.4GB
|
||||
/// - Should leave enough headroom for:
|
||||
/// - CUDA context: ~500MB
|
||||
/// - Temporary buffers: ~500MB
|
||||
/// - Safety margin: ~600MB
|
||||
///
|
||||
/// **How to tune:**
|
||||
/// 1. Start with 0.95 (current setting)
|
||||
/// 2. Monitor with `nvidia-smi dmon -s u` during operation
|
||||
/// 3. If OOM during inference: reduce MAX_MODEL_LEN first
|
||||
/// 4. If stable: try increasing MAX_MODEL_LEN before increasing this
|
||||
/// 5. Only increase this if you're certain no other GPU processes run
|
||||
///
|
||||
/// **NEVER decrease this for dense models!**
|
||||
/// If model doesn't fit, use a smaller model or quantization, not CPU offloading.
|
||||
const GPU_MEMORY_UTILIZATION : f32 = 1.0;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
env_logger::init();
|
||||
|
||||
info!("Deploying vLLM with Qwen3.5-27B-FP8 model");
|
||||
info!("Configuration:");
|
||||
info!(" Model: {}", MODEL_NAME);
|
||||
info!(" Max context length: {} tokens", MAX_MODEL_LEN);
|
||||
info!(" GPU memory utilization: {}", GPU_MEMORY_UTILIZATION);
|
||||
info!(" Language model only: true");
|
||||
info!(" Tool calling enabled: true");
|
||||
|
||||
let topology = K8sAnywhereTopology::from_env();
|
||||
let domain = topology
|
||||
.get_internal_domain()
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
.unwrap_or_else(|| "cluster.local".to_string());
|
||||
|
||||
let host = format!("{}-{}.apps.{}", SERVICE_NAME, NAMESPACE, domain);
|
||||
info!("Creating route with host: {}", host);
|
||||
|
||||
let scores: Vec<Box<dyn Score<K8sAnywhereTopology>>> = vec![
|
||||
create_namespace(),
|
||||
create_pvc(),
|
||||
create_secret(),
|
||||
create_deployment(),
|
||||
create_service(),
|
||||
create_route(&host),
|
||||
];
|
||||
|
||||
harmony_cli::run(Inventory::autoload(), topology, scores, None)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to deploy: {}", e))?;
|
||||
|
||||
info!("Successfully deployed vLLM with Qwen3.5-27B-FP8");
|
||||
info!("Access the API at: http://{}.apps.<cluster-domain>", SERVICE_NAME);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_namespace() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||
use k8s_openapi::api::core::v1::Namespace;
|
||||
|
||||
let namespace = Namespace {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(NAMESPACE.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: None,
|
||||
status: None,
|
||||
};
|
||||
|
||||
Box::new(K8sResourceScore::single(namespace, None))
|
||||
}
|
||||
|
||||
fn create_pvc() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||
let pvc = PersistentVolumeClaim {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(PVC_NAME.to_string()),
|
||||
namespace: Some(NAMESPACE.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: Some(PersistentVolumeClaimSpec {
|
||||
access_modes: Some(vec!["ReadWriteOnce".to_string()]),
|
||||
resources: Some(VolumeResourceRequirements {
|
||||
requests: Some(BTreeMap::from([(
|
||||
"storage".to_string(),
|
||||
Quantity("50Gi".to_string()),
|
||||
)])),
|
||||
limits: None,
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
status: None,
|
||||
};
|
||||
|
||||
Box::new(K8sResourceScore::single(
|
||||
pvc,
|
||||
Some(NAMESPACE.to_string()),
|
||||
))
|
||||
}
|
||||
|
||||
fn create_secret() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||
let mut data = BTreeMap::new();
|
||||
data.insert(
|
||||
"token".to_string(),
|
||||
ByteString("".to_string().into_bytes()),
|
||||
);
|
||||
|
||||
let secret = Secret {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(SECRET_NAME.to_string()),
|
||||
namespace: Some(NAMESPACE.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
data: Some(data),
|
||||
immutable: Some(false),
|
||||
type_: Some("Opaque".to_string()),
|
||||
string_data: None,
|
||||
};
|
||||
|
||||
Box::new(K8sResourceScore::single(
|
||||
secret,
|
||||
Some(NAMESPACE.to_string()),
|
||||
))
|
||||
}
|
||||
|
||||
fn create_deployment() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||
let deployment = Deployment {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(DEPLOYMENT_NAME.to_string()),
|
||||
namespace: Some(NAMESPACE.to_string()),
|
||||
labels: Some(BTreeMap::from([(
|
||||
"app".to_string(),
|
||||
DEPLOYMENT_NAME.to_string(),
|
||||
)])),
|
||||
..Default::default()
|
||||
},
|
||||
spec: Some(DeploymentSpec {
|
||||
replicas: Some(1),
|
||||
selector: LabelSelector {
|
||||
match_labels: Some(BTreeMap::from([(
|
||||
"app".to_string(),
|
||||
DEPLOYMENT_NAME.to_string(),
|
||||
)])),
|
||||
..Default::default()
|
||||
},
|
||||
strategy: Some(DeploymentStrategy {
|
||||
type_: Some("Recreate".to_string()),
|
||||
..Default::default()
|
||||
}),
|
||||
template: PodTemplateSpec {
|
||||
metadata: Some(ObjectMeta {
|
||||
labels: Some(BTreeMap::from([(
|
||||
"app".to_string(),
|
||||
DEPLOYMENT_NAME.to_string(),
|
||||
)])),
|
||||
..Default::default()
|
||||
}),
|
||||
spec: Some(PodSpec {
|
||||
node_selector: Some(BTreeMap::from([(
|
||||
"nvidia.com/gpu.product".to_string(),
|
||||
"NVIDIA-GeForce-RTX-5090".to_string(),
|
||||
)])),
|
||||
volumes: Some(vec![
|
||||
Volume {
|
||||
name: "cache-volume".to_string(),
|
||||
persistent_volume_claim: Some(PersistentVolumeClaimVolumeSource {
|
||||
claim_name: PVC_NAME.to_string(),
|
||||
read_only: Some(false),
|
||||
}),
|
||||
..Default::default()
|
||||
},
|
||||
Volume {
|
||||
name: "shm".to_string(),
|
||||
empty_dir: Some(EmptyDirVolumeSource {
|
||||
medium: Some("Memory".to_string()),
|
||||
size_limit: Some(Quantity("4Gi".to_string())),
|
||||
}),
|
||||
..Default::default()
|
||||
},
|
||||
Volume {
|
||||
name: "hf-token".to_string(),
|
||||
secret: Some(SecretVolumeSource {
|
||||
secret_name: Some(SECRET_NAME.to_string()),
|
||||
optional: Some(true),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
},
|
||||
]),
|
||||
containers: vec![Container {
|
||||
name: DEPLOYMENT_NAME.to_string(),
|
||||
image: Some(VLLM_IMAGE.to_string()),
|
||||
command: Some(vec!["/bin/sh".to_string(), "-c".to_string()]),
|
||||
args: Some(vec![build_vllm_command()]),
|
||||
env: Some(vec![
|
||||
EnvVar {
|
||||
name: "HF_TOKEN".to_string(),
|
||||
value_from: Some(EnvVarSource {
|
||||
secret_key_ref: Some(SecretKeySelector {
|
||||
key: "token".to_string(),
|
||||
name: SECRET_NAME.to_string(),
|
||||
optional: Some(true),
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
value: None,
|
||||
},
|
||||
EnvVar {
|
||||
name: "VLLM_WORKER_MULTIPROC_METHOD".to_string(),
|
||||
value: Some("spawn".to_string()),
|
||||
value_from: None,
|
||||
},
|
||||
]),
|
||||
ports: Some(vec![ContainerPort {
|
||||
container_port: SERVICE_PORT as i32,
|
||||
protocol: Some("TCP".to_string()),
|
||||
..Default::default()
|
||||
}]),
|
||||
resources: Some(ResourceRequirements {
|
||||
limits: Some(BTreeMap::from([
|
||||
("cpu".to_string(), Quantity("10".to_string())),
|
||||
("memory".to_string(), Quantity("30Gi".to_string())),
|
||||
("nvidia.com/gpu".to_string(), Quantity("1".to_string())),
|
||||
])),
|
||||
requests: Some(BTreeMap::from([
|
||||
("cpu".to_string(), Quantity("2".to_string())),
|
||||
("memory".to_string(), Quantity("10Gi".to_string())),
|
||||
("nvidia.com/gpu".to_string(), Quantity("1".to_string())),
|
||||
])),
|
||||
claims: None,
|
||||
}),
|
||||
volume_mounts: Some(vec![
|
||||
VolumeMount {
|
||||
name: "cache-volume".to_string(),
|
||||
mount_path: "/root/.cache/huggingface".to_string(),
|
||||
read_only: Some(false),
|
||||
..Default::default()
|
||||
},
|
||||
VolumeMount {
|
||||
name: "shm".to_string(),
|
||||
mount_path: "/dev/shm".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
VolumeMount {
|
||||
name: "hf-token".to_string(),
|
||||
mount_path: "/etc/secrets/hf-token".to_string(),
|
||||
read_only: Some(true),
|
||||
..Default::default()
|
||||
},
|
||||
]),
|
||||
liveness_probe: Some(Probe {
|
||||
http_get: Some(HTTPGetAction {
|
||||
path: Some("/health".to_string()),
|
||||
port: IntOrString::Int(SERVICE_PORT as i32),
|
||||
..Default::default()
|
||||
}),
|
||||
initial_delay_seconds: Some(300),
|
||||
period_seconds: Some(30),
|
||||
..Default::default()
|
||||
}),
|
||||
readiness_probe: Some(Probe {
|
||||
http_get: Some(HTTPGetAction {
|
||||
path: Some("/health".to_string()),
|
||||
port: IntOrString::Int(SERVICE_PORT as i32),
|
||||
..Default::default()
|
||||
}),
|
||||
initial_delay_seconds: Some(120),
|
||||
period_seconds: Some(10),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
}],
|
||||
..Default::default()
|
||||
}),
|
||||
},
|
||||
..Default::default()
|
||||
}),
|
||||
status: None,
|
||||
};
|
||||
|
||||
Box::new(K8sResourceScore::single(
|
||||
deployment,
|
||||
Some(NAMESPACE.to_string()),
|
||||
))
|
||||
}
|
||||
|
||||
fn build_vllm_command() -> String {
|
||||
format!(
|
||||
"vllm serve {} \
|
||||
--port {} \
|
||||
--max-model-len {} \
|
||||
--gpu-memory-utilization {} \
|
||||
--reasoning-parser qwen3 \
|
||||
--enable-auto-tool-choice \
|
||||
--tool-call-parser qwen3_coder \
|
||||
--language-model-only",
|
||||
MODEL_NAME, SERVICE_PORT, MAX_MODEL_LEN, GPU_MEMORY_UTILIZATION
|
||||
)
|
||||
}
|
||||
|
||||
fn create_service() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||
let service = Service {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(SERVICE_NAME.to_string()),
|
||||
namespace: Some(NAMESPACE.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: Some(ServiceSpec {
|
||||
ports: Some(vec![ServicePort {
|
||||
name: Some("http".to_string()),
|
||||
port: SERVICE_PORT as i32,
|
||||
protocol: Some("TCP".to_string()),
|
||||
target_port: Some(IntOrString::Int(TARGET_PORT as i32)),
|
||||
..Default::default()
|
||||
}]),
|
||||
selector: Some(BTreeMap::from([(
|
||||
"app".to_string(),
|
||||
DEPLOYMENT_NAME.to_string(),
|
||||
)])),
|
||||
type_: Some("ClusterIP".to_string()),
|
||||
..Default::default()
|
||||
}),
|
||||
status: None,
|
||||
};
|
||||
|
||||
Box::new(K8sResourceScore::single(
|
||||
service,
|
||||
Some(NAMESPACE.to_string()),
|
||||
))
|
||||
}
|
||||
|
||||
fn create_route(host: &str) -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||
let route_spec = RouteSpec {
|
||||
to: RouteTargetReference {
|
||||
kind: "Service".to_string(),
|
||||
name: SERVICE_NAME.to_string(),
|
||||
weight: Some(100),
|
||||
},
|
||||
host: Some(host.to_string()),
|
||||
port: Some(RoutePort {
|
||||
target_port: SERVICE_PORT as u16,
|
||||
}),
|
||||
tls: Some(TLSConfig {
|
||||
termination: "edge".to_string(),
|
||||
insecure_edge_termination_policy: Some("Redirect".to_string()),
|
||||
..Default::default()
|
||||
}),
|
||||
wildcard_policy: None,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
Box::new(OKDRouteScore::new(ROUTE_NAME, NAMESPACE, route_spec))
|
||||
}
|
||||
@@ -109,6 +109,13 @@ impl K8sclient for K8sAnywhereTopology {
|
||||
|
||||
#[async_trait]
|
||||
impl TlsRouter for K8sAnywhereTopology {
|
||||
async fn get_public_domain(&self) -> Result<String, String> {
|
||||
match &self.config.public_domain {
|
||||
Some(public_domain) => Ok(public_domain.to_string()),
|
||||
None => Err("Public domain not available".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_internal_domain(&self) -> Result<Option<String>, String> {
|
||||
match self.get_k8s_distribution().await.map_err(|e| {
|
||||
format!(
|
||||
@@ -1124,6 +1131,7 @@ pub struct K8sAnywhereConfig {
|
||||
///
|
||||
/// If the context name is not found, it will fail to initialize.
|
||||
pub k8s_context: Option<String>,
|
||||
public_domain: Option<String>,
|
||||
}
|
||||
|
||||
impl K8sAnywhereConfig {
|
||||
@@ -1151,6 +1159,7 @@ impl K8sAnywhereConfig {
|
||||
|
||||
let mut kubeconfig: Option<String> = None;
|
||||
let mut k8s_context: Option<String> = None;
|
||||
let mut public_domain: Option<String> = None;
|
||||
|
||||
for part in env_var_value.split(',') {
|
||||
let kv: Vec<&str> = part.splitn(2, '=').collect();
|
||||
@@ -1158,6 +1167,7 @@ impl K8sAnywhereConfig {
|
||||
match kv[0].trim() {
|
||||
"kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()),
|
||||
"context" => k8s_context = Some(kv[1].trim().to_string()),
|
||||
"public_domain" => public_domain = Some(kv[1].trim().to_string()),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -1175,6 +1185,7 @@ impl K8sAnywhereConfig {
|
||||
K8sAnywhereConfig {
|
||||
kubeconfig,
|
||||
k8s_context,
|
||||
public_domain,
|
||||
use_system_kubeconfig,
|
||||
autoinstall: false,
|
||||
use_local_k3d: false,
|
||||
@@ -1217,6 +1228,7 @@ impl K8sAnywhereConfig {
|
||||
use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D")
|
||||
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)),
|
||||
k8s_context: std::env::var("HARMONY_K8S_CONTEXT").ok(),
|
||||
public_domain: std::env::var("HARMONY_PUBLIC_DOMAIN").ok(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -122,4 +122,6 @@ pub trait TlsRouter: Send + Sync {
|
||||
|
||||
/// Returns the port that this router exposes externally.
|
||||
async fn get_router_port(&self) -> u16;
|
||||
|
||||
async fn get_public_domain(&self) -> Result<String, String>;
|
||||
}
|
||||
|
||||
@@ -267,10 +267,16 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
|
||||
SSL::Default => "".into(),
|
||||
SSL::Other(other) => other.as_str().into(),
|
||||
};
|
||||
let path_without_query = path.split_once('?').map_or(path.as_str(), |(p, _)| p);
|
||||
let (port, port_name) = match port {
|
||||
Some(port) => (Some(port.to_string()), port.to_string()),
|
||||
None => (None, "serverport".to_string()),
|
||||
};
|
||||
|
||||
let haproxy_check = HAProxyHealthCheck {
|
||||
name: format!("HTTP_{http_method}_{path}"),
|
||||
name: format!("HTTP_{http_method}_{path_without_query}_{port_name}"),
|
||||
uuid: Uuid::new_v4().to_string(),
|
||||
http_method: http_method.to_string().into(),
|
||||
http_method: http_method.to_string().to_lowercase().into(),
|
||||
health_check_type: "http".to_string(),
|
||||
http_uri: path.clone().into(),
|
||||
interval: "2s".to_string(),
|
||||
@@ -314,7 +320,10 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
|
||||
let mut backend = HAProxyBackend {
|
||||
uuid: Uuid::new_v4().to_string(),
|
||||
enabled: 1,
|
||||
name: format!("backend_{}", service.listening_port),
|
||||
name: format!(
|
||||
"backend_{}",
|
||||
service.listening_port.to_string().replace(':', "_")
|
||||
),
|
||||
algorithm: "roundrobin".to_string(),
|
||||
random_draws: Some(2),
|
||||
stickiness_expire: "30m".to_string(),
|
||||
@@ -346,10 +355,22 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
|
||||
let frontend = Frontend {
|
||||
uuid: uuid::Uuid::new_v4().to_string(),
|
||||
enabled: 1,
|
||||
name: format!("frontend_{}", service.listening_port),
|
||||
name: format!(
|
||||
"frontend_{}",
|
||||
service.listening_port.to_string().replace(':', "_")
|
||||
),
|
||||
bind: service.listening_port.to_string(),
|
||||
mode: "tcp".to_string(), // TODO do not depend on health check here
|
||||
default_backend: Some(backend.uuid.clone()),
|
||||
stickiness_expire: "30m".to_string().into(),
|
||||
stickiness_size: "50k".to_string().into(),
|
||||
stickiness_conn_rate_period: "10s".to_string().into(),
|
||||
stickiness_sess_rate_period: "10s".to_string().into(),
|
||||
stickiness_http_req_rate_period: "10s".to_string().into(),
|
||||
stickiness_http_err_rate_period: "10s".to_string().into(),
|
||||
stickiness_bytes_in_rate_period: "1m".to_string().into(),
|
||||
stickiness_bytes_out_rate_period: "1m".to_string().into(),
|
||||
ssl_hsts_max_age: 15768000,
|
||||
..Default::default()
|
||||
};
|
||||
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::fs::{self};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
@@ -65,6 +64,7 @@ pub struct RustWebapp {
|
||||
///
|
||||
/// This is the place to put the public host name if this is a public facing webapp.
|
||||
pub dns: String,
|
||||
pub version: String,
|
||||
}
|
||||
|
||||
impl Application for RustWebapp {
|
||||
@@ -465,6 +465,7 @@ impl RustWebapp {
|
||||
|
||||
let app_name = &self.name;
|
||||
let service_port = self.service_port;
|
||||
let version = &self.version;
|
||||
// Create Chart.yaml
|
||||
let chart_yaml = format!(
|
||||
r#"
|
||||
@@ -472,7 +473,7 @@ apiVersion: v2
|
||||
name: {chart_name}
|
||||
description: A Helm chart for the {app_name} web application.
|
||||
type: application
|
||||
version: 0.2.1
|
||||
version: {version}
|
||||
appVersion: "{image_tag}"
|
||||
"#,
|
||||
);
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio_retry::{Retry, strategy::ExponentialBackoff};
|
||||
|
||||
use crate::modules::{
|
||||
cert_manager::{
|
||||
capability::{CertificateManagement, CertificateManagementConfig},
|
||||
@@ -69,9 +73,28 @@ where
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
self.topology
|
||||
.get_ca_certificate(root_ca_cert_name.into(), &root_ca_config)
|
||||
.await
|
||||
.map_err(|e| e.to_string())
|
||||
let strategy = ExponentialBackoff::from_millis(250)
|
||||
.factor(2)
|
||||
.max_delay(Duration::from_millis(1000))
|
||||
.take(10);
|
||||
|
||||
Retry::spawn(strategy, || async {
|
||||
log::debug!("Attempting CA cert fetch");
|
||||
|
||||
let res = self
|
||||
.topology
|
||||
.get_ca_certificate(root_ca_cert_name.into(), &root_ca_config)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(cert) => Ok(cert),
|
||||
Err(e) => {
|
||||
log::warn!("Retryable error: {:?}", e);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|e| format!("Retries exhausted: {:?}", e))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,20 @@ use log::warn;
|
||||
use crate::topology::{FailoverTopology, TlsRoute, TlsRouter};
|
||||
|
||||
#[async_trait]
|
||||
impl<T: TlsRouter> TlsRouter for FailoverTopology<T> {
|
||||
impl<T: TlsRouter + Send> TlsRouter for FailoverTopology<T> {
|
||||
async fn get_public_domain(&self) -> Result<String, String> {
|
||||
/*
|
||||
let primary_domain = self
|
||||
.primary
|
||||
.get_public_domain()
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
Ok(primary_domain)
|
||||
*/
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn get_internal_domain(&self) -> Result<Option<String>, String> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use k8s_openapi::api::scheduling::v1::PriorityClass;
|
||||
use k8s_openapi::api::{
|
||||
apps::v1::{DaemonSet, DaemonSetSpec},
|
||||
core::v1::{
|
||||
@@ -144,6 +145,19 @@ impl<T: Topology + K8sclient> Interpret<T> for NodeHealthInterpret {
|
||||
},
|
||||
};
|
||||
|
||||
// PriorityClass
|
||||
let priority_class_name = "node-healthcheck-critical".to_string();
|
||||
let priority_class = PriorityClass {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(priority_class_name.clone()),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
value: 1000000000,
|
||||
global_default: Some(false),
|
||||
preemption_policy: Some("PreemptLowerPriority".to_string()),
|
||||
description: Some("Highest priority for node health check daemonset - can preempt lower priority pods".to_string()),
|
||||
};
|
||||
|
||||
// DaemonSet
|
||||
let mut daemonset_labels = BTreeMap::new();
|
||||
daemonset_labels.insert("app".to_string(), "node-healthcheck".to_string());
|
||||
@@ -168,6 +182,7 @@ impl<T: Topology + K8sclient> Interpret<T> for NodeHealthInterpret {
|
||||
spec: Some(PodSpec {
|
||||
service_account_name: Some(service_account_name.clone()),
|
||||
host_network: Some(true),
|
||||
priority_class_name: Some(priority_class_name),
|
||||
tolerations: Some(vec![Toleration {
|
||||
operator: Some("Exists".to_string()),
|
||||
..Toleration::default()
|
||||
@@ -182,6 +197,7 @@ impl<T: Topology + K8sclient> Interpret<T> for NodeHealthInterpret {
|
||||
name: "NODE_NAME".to_string(),
|
||||
value_from: Some(EnvVarSource {
|
||||
field_ref: Some(ObjectFieldSelector {
|
||||
api_version: Some("v1".to_string()),
|
||||
field_path: "spec.nodeName".to_string(),
|
||||
..ObjectFieldSelector::default()
|
||||
}),
|
||||
@@ -233,6 +249,9 @@ impl<T: Topology + K8sclient> Interpret<T> for NodeHealthInterpret {
|
||||
K8sResourceScore::single(cluster_role_binding, None)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
K8sResourceScore::single(priority_class, None)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
K8sResourceScore::single(daemon_set, Some(namespace_name.clone()))
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
|
||||
@@ -37,6 +37,7 @@ pub struct PostgreSQLConfig {
|
||||
/// settings incompatible with the default CNPG behavior.
|
||||
pub namespace: String,
|
||||
}
|
||||
|
||||
impl PostgreSQLConfig {
|
||||
pub fn with_namespace(&self, namespace: &str) -> PostgreSQLConfig {
|
||||
let mut new = self.clone();
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use kube::{CustomResource, api::ObjectMeta};
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -19,6 +20,10 @@ pub struct ClusterSpec {
|
||||
pub image_name: Option<String>,
|
||||
pub storage: Storage,
|
||||
pub bootstrap: Bootstrap,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub external_clusters: Option<Vec<ExternalCluster>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub replica: Option<ReplicaSpec>,
|
||||
/// This must be set to None if you want cnpg to generate a superuser secret
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub superuser_secret: Option<BTreeMap<String, String>>,
|
||||
@@ -41,6 +46,8 @@ impl Default for ClusterSpec {
|
||||
image_name: None,
|
||||
storage: Storage::default(),
|
||||
bootstrap: Bootstrap::default(),
|
||||
external_clusters: None,
|
||||
replica: None,
|
||||
superuser_secret: None,
|
||||
enable_superuser_access: false,
|
||||
}
|
||||
@@ -56,7 +63,13 @@ pub struct Storage {
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Bootstrap {
|
||||
pub initdb: Initdb,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub initdb: Option<Initdb>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub recovery: Option<Recovery>,
|
||||
#[serde(rename = "pg_basebackup")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub pg_basebackup: Option<PgBaseBackup>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
@@ -65,3 +78,50 @@ pub struct Initdb {
|
||||
pub database: String,
|
||||
pub owner: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Recovery {
|
||||
pub source: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
pub struct PgBaseBackup {
|
||||
pub source: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ExternalCluster {
|
||||
pub name: String,
|
||||
pub connection_parameters: HashMap<String, String>,
|
||||
pub ssl_key: Option<SecretKeySelector>,
|
||||
pub ssl_cert: Option<SecretKeySelector>,
|
||||
pub ssl_root_cert: Option<SecretKeySelector>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ConnectionParameters {
|
||||
pub host: String,
|
||||
pub user: String,
|
||||
pub dbname: String,
|
||||
pub sslmode: String,
|
||||
pub sslnegotiation: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ReplicaSpec {
|
||||
pub enabled: bool,
|
||||
pub source: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub primary: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SecretKeySelector {
|
||||
pub name: String,
|
||||
pub key: String,
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ use log::debug;
|
||||
use log::info;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::interpret::InterpretError;
|
||||
use crate::topology::TlsRoute;
|
||||
use crate::topology::TlsRouter;
|
||||
use crate::{
|
||||
modules::postgresql::capability::{
|
||||
@@ -49,8 +51,18 @@ impl<T: PostgreSQL + TlsRouter> PostgreSQL for FailoverTopology<T> {
|
||||
// TODO we should be getting the public endpoint for a service by calling a method on
|
||||
// TlsRouter capability.
|
||||
// Something along the lines of `TlsRouter::get_hostname_for_service(...).await?;`
|
||||
let host = format!(
|
||||
"{}.{}.{}",
|
||||
config.cluster_name,
|
||||
config.namespace,
|
||||
self.primary
|
||||
.get_public_domain()
|
||||
.await
|
||||
.expect("failed to retrieve public domain")
|
||||
.to_string()
|
||||
);
|
||||
let endpoint = PostgreSQLEndpoint {
|
||||
host: "postgrestest.sto1.nationtech.io".to_string(),
|
||||
host,
|
||||
port: self.primary.get_router_port().await,
|
||||
};
|
||||
|
||||
@@ -59,6 +71,46 @@ impl<T: PostgreSQL + TlsRouter> PostgreSQL for FailoverTopology<T> {
|
||||
endpoint.host, endpoint.port
|
||||
);
|
||||
|
||||
info!("installing primary postgres route");
|
||||
let prim_hostname = format!(
|
||||
"{}.{}.{}",
|
||||
config.cluster_name,
|
||||
config.namespace,
|
||||
self.primary.get_public_domain().await?
|
||||
);
|
||||
let rw_backend = format!("{}-rw", config.cluster_name);
|
||||
let tls_route = TlsRoute {
|
||||
hostname: prim_hostname,
|
||||
backend: rw_backend,
|
||||
target_port: 5432,
|
||||
namespace: config.namespace.clone(),
|
||||
};
|
||||
// Expose RW publicly via TLS passthrough
|
||||
self.primary
|
||||
.install_route(tls_route.clone())
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(e))?;
|
||||
|
||||
info!("installing replica postgres route");
|
||||
let rep_hostname = format!(
|
||||
"{}.{}.{}",
|
||||
config.cluster_name,
|
||||
config.namespace,
|
||||
self.replica.get_public_domain().await?
|
||||
);
|
||||
let rw_backend = format!("{}-rw", config.cluster_name);
|
||||
let tls_route = TlsRoute {
|
||||
hostname: rep_hostname,
|
||||
backend: rw_backend,
|
||||
target_port: 5432,
|
||||
namespace: config.namespace.clone(),
|
||||
};
|
||||
|
||||
// Expose RW publicly via TLS passthrough
|
||||
self.replica
|
||||
.install_route(tls_route.clone())
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(e))?;
|
||||
info!("Configuring replica connection parameters and bootstrap");
|
||||
|
||||
let mut connection_parameters = HashMap::new();
|
||||
|
||||
@@ -1,14 +1,21 @@
|
||||
use std::collections::BTreeMap;
|
||||
use crate::data::Version;
|
||||
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
|
||||
use crate::inventory::Inventory;
|
||||
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::interpret::Interpret;
|
||||
use crate::modules::k8s::resource::K8sResourceScore;
|
||||
use crate::modules::postgresql::capability::PostgreSQLConfig;
|
||||
use crate::modules::postgresql::cnpg::{Bootstrap, Cluster, ClusterSpec, Initdb, Storage};
|
||||
use crate::modules::postgresql::cnpg::{
|
||||
Bootstrap, Cluster, ClusterSpec, ExternalCluster, Initdb, PgBaseBackup, ReplicaSpec,
|
||||
SecretKeySelector, Storage,
|
||||
};
|
||||
use crate::score::Score;
|
||||
use crate::topology::{K8sclient, Topology};
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use k8s_openapi::ByteString;
|
||||
use k8s_openapi::api::core::v1::Secret;
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
||||
use serde::Serialize;
|
||||
|
||||
/// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG.
|
||||
///
|
||||
@@ -51,37 +58,184 @@ impl K8sPostgreSQLScore {
|
||||
|
||||
impl<T: Topology + K8sclient> Score<T> for K8sPostgreSQLScore {
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
let metadata = ObjectMeta {
|
||||
name: Some(self.config.cluster_name.clone()),
|
||||
namespace: Some(self.config.namespace.clone()),
|
||||
..ObjectMeta::default()
|
||||
};
|
||||
|
||||
let spec = ClusterSpec {
|
||||
instances: self.config.instances,
|
||||
storage: Storage {
|
||||
size: self.config.storage_size.to_string(),
|
||||
},
|
||||
bootstrap: Bootstrap {
|
||||
initdb: Initdb {
|
||||
database: "app".to_string(),
|
||||
owner: "app".to_string(),
|
||||
},
|
||||
},
|
||||
// superuser_secret: Some(BTreeMap::from([(
|
||||
// "name".to_string(),
|
||||
// format!("{}-superuser", self.config.cluster_name.clone()),
|
||||
// )])),
|
||||
enable_superuser_access: true,
|
||||
..ClusterSpec::default()
|
||||
};
|
||||
|
||||
let cluster = Cluster { metadata, spec };
|
||||
|
||||
K8sResourceScore::single(cluster, Some(self.config.namespace.clone())).create_interpret()
|
||||
Box::new(K8sPostgreSQLInterpret {
|
||||
config: self.config.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
format!("PostgreSQLScore({})", self.config.namespace)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct K8sPostgreSQLInterpret {
|
||||
config: PostgreSQLConfig,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + K8sclient> Interpret<T> for K8sPostgreSQLInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
match &self.config.role {
|
||||
super::capability::PostgreSQLClusterRole::Primary => {
|
||||
let metadata = ObjectMeta {
|
||||
name: Some(self.config.cluster_name.clone()),
|
||||
namespace: Some(self.config.namespace.clone()),
|
||||
..ObjectMeta::default()
|
||||
};
|
||||
|
||||
let spec = ClusterSpec {
|
||||
instances: self.config.instances,
|
||||
storage: Storage {
|
||||
size: self.config.storage_size.to_string(),
|
||||
},
|
||||
bootstrap: Bootstrap {
|
||||
initdb: Some(Initdb {
|
||||
database: "app".to_string(),
|
||||
owner: "app".to_string(),
|
||||
}),
|
||||
recovery: None,
|
||||
pg_basebackup: None,
|
||||
},
|
||||
enable_superuser_access: true,
|
||||
..ClusterSpec::default()
|
||||
};
|
||||
let cluster = Cluster { metadata, spec };
|
||||
|
||||
Ok(
|
||||
K8sResourceScore::single(cluster, Some(self.config.namespace.clone()))
|
||||
.create_interpret()
|
||||
.execute(inventory, topology)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
super::capability::PostgreSQLClusterRole::Replica(replica_config) => {
|
||||
let metadata = ObjectMeta {
|
||||
name: Some("streaming-replica-certs".to_string()),
|
||||
namespace: Some(self.config.namespace.clone()),
|
||||
..ObjectMeta::default()
|
||||
};
|
||||
|
||||
// The data must be base64-encoded. If you already have PEM strings in your config, encode them:
|
||||
let mut data = std::collections::BTreeMap::new();
|
||||
data.insert(
|
||||
"tls.key".to_string(),
|
||||
ByteString(
|
||||
replica_config
|
||||
.replication_certs
|
||||
.streaming_replica_key_pem
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
),
|
||||
);
|
||||
data.insert(
|
||||
"tls.crt".to_string(),
|
||||
ByteString(
|
||||
replica_config
|
||||
.replication_certs
|
||||
.streaming_replica_cert_pem
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
),
|
||||
);
|
||||
data.insert(
|
||||
"ca.crt".to_string(),
|
||||
ByteString(
|
||||
replica_config
|
||||
.replication_certs
|
||||
.ca_cert_pem
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
),
|
||||
);
|
||||
|
||||
let secret = Secret {
|
||||
metadata,
|
||||
data: Some(data),
|
||||
string_data: None, // You could use string_data if you prefer raw strings
|
||||
type_: Some("Opaque".to_string()),
|
||||
..Secret::default()
|
||||
};
|
||||
|
||||
K8sResourceScore::single(secret, Some(self.config.namespace.clone()))
|
||||
.create_interpret()
|
||||
.execute(inventory, topology)
|
||||
.await?;
|
||||
|
||||
let metadata = ObjectMeta {
|
||||
name: Some(self.config.cluster_name.clone()),
|
||||
namespace: Some(self.config.namespace.clone()),
|
||||
..ObjectMeta::default()
|
||||
};
|
||||
|
||||
let spec = ClusterSpec {
|
||||
instances: self.config.instances,
|
||||
storage: Storage {
|
||||
size: self.config.storage_size.to_string(),
|
||||
},
|
||||
bootstrap: Bootstrap {
|
||||
initdb: None,
|
||||
recovery: None,
|
||||
pg_basebackup: Some(PgBaseBackup {
|
||||
source: replica_config.primary_cluster_name.clone(),
|
||||
}),
|
||||
},
|
||||
external_clusters: Some(vec![ExternalCluster {
|
||||
name: replica_config.primary_cluster_name.clone(),
|
||||
connection_parameters: replica_config
|
||||
.external_cluster
|
||||
.connection_parameters
|
||||
.clone(),
|
||||
ssl_key: Some(SecretKeySelector {
|
||||
name: "streaming-replica-certs".to_string(),
|
||||
key: "tls.key".to_string(),
|
||||
}),
|
||||
ssl_cert: Some(SecretKeySelector {
|
||||
name: "streaming-replica-certs".to_string(),
|
||||
key: "tls.crt".to_string(),
|
||||
}),
|
||||
ssl_root_cert: Some(SecretKeySelector {
|
||||
name: "streaming-replica-certs".to_string(),
|
||||
key: "ca.crt".to_string(),
|
||||
}),
|
||||
}]),
|
||||
replica: Some(ReplicaSpec {
|
||||
enabled: true,
|
||||
source: replica_config.primary_cluster_name.clone(),
|
||||
primary: None,
|
||||
}),
|
||||
..ClusterSpec::default()
|
||||
};
|
||||
|
||||
let cluster = Cluster { metadata, spec };
|
||||
|
||||
Ok(
|
||||
K8sResourceScore::single(cluster, Some(self.config.namespace.clone()))
|
||||
.create_interpret()
|
||||
.execute(inventory, topology)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("K8sPostgreSQLInterpret")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,46 +18,31 @@ use crate::topology::Topology;
|
||||
/// # Usage
|
||||
/// ```
|
||||
/// use harmony::modules::postgresql::PublicPostgreSQLScore;
|
||||
/// let score = PublicPostgreSQLScore::new("harmony", "pg-rw.example.com");
|
||||
/// let score = PublicPostgreSQLScore::new("harmony");
|
||||
/// ```
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct PublicPostgreSQLScore {
|
||||
/// Inner non-public Postgres cluster config.
|
||||
pub config: PostgreSQLConfig,
|
||||
/// Public hostname for RW TLS passthrough (port 443 → cluster-rw:5432).
|
||||
pub hostname: String,
|
||||
}
|
||||
|
||||
impl PublicPostgreSQLScore {
|
||||
pub fn new(namespace: &str, hostname: &str) -> Self {
|
||||
pub fn new(namespace: &str) -> Self {
|
||||
Self {
|
||||
config: PostgreSQLConfig::default().with_namespace(namespace),
|
||||
hostname: hostname.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Topology + PostgreSQL + TlsRouter> Score<T> for PublicPostgreSQLScore {
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
let rw_backend = format!("{}-rw", self.config.cluster_name);
|
||||
let tls_route = TlsRoute {
|
||||
namespace: self.config.namespace.clone(),
|
||||
hostname: self.hostname.clone(),
|
||||
backend: rw_backend,
|
||||
target_port: 5432,
|
||||
};
|
||||
|
||||
Box::new(PublicPostgreSQLInterpret {
|
||||
config: self.config.clone(),
|
||||
tls_route,
|
||||
})
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
format!(
|
||||
"PublicPostgreSQLScore({}:{})",
|
||||
self.config.namespace, self.hostname
|
||||
)
|
||||
format!("PublicPostgreSQLScore({})", self.config.namespace)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,7 +50,6 @@ impl<T: Topology + PostgreSQL + TlsRouter> Score<T> for PublicPostgreSQLScore {
|
||||
#[derive(Debug, Clone)]
|
||||
struct PublicPostgreSQLInterpret {
|
||||
config: PostgreSQLConfig,
|
||||
tls_route: TlsRoute,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -76,15 +60,28 @@ impl<T: Topology + PostgreSQL + TlsRouter> Interpret<T> for PublicPostgreSQLInte
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(e))?;
|
||||
|
||||
let hostname = format!(
|
||||
"{}.{}.{}",
|
||||
self.config.cluster_name,
|
||||
self.config.namespace,
|
||||
topo.get_public_domain().await?
|
||||
);
|
||||
let rw_backend = format!("{}-rw", self.config.cluster_name);
|
||||
let tls_route = TlsRoute {
|
||||
hostname,
|
||||
backend: rw_backend,
|
||||
target_port: 5432,
|
||||
namespace: self.config.namespace.clone(),
|
||||
};
|
||||
// Expose RW publicly via TLS passthrough
|
||||
topo.install_route(self.tls_route.clone())
|
||||
topo.install_route(tls_route.clone())
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(e))?;
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"Public CNPG cluster '{}' deployed with TLS passthrough route '{}'",
|
||||
self.config.cluster_name.clone(),
|
||||
self.tls_route.hostname
|
||||
tls_route.hostname
|
||||
)))
|
||||
}
|
||||
|
||||
|
||||
@@ -344,7 +344,7 @@ pub struct StaticMap {
|
||||
pub mac: String,
|
||||
pub ipaddr: String,
|
||||
pub cid: Option<MaybeString>,
|
||||
pub hostname: String,
|
||||
pub hostname: Option<String>,
|
||||
pub descr: Option<MaybeString>,
|
||||
pub winsserver: MaybeString,
|
||||
pub dnsserver: MaybeString,
|
||||
@@ -383,24 +383,24 @@ pub struct Outbound {
|
||||
|
||||
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
||||
pub struct NatRule {
|
||||
pub protocol: String,
|
||||
pub interface: String,
|
||||
pub category: MaybeString,
|
||||
pub ipprotocol: String,
|
||||
pub descr: MaybeString,
|
||||
pub tag: MaybeString,
|
||||
pub protocol: Option<String>,
|
||||
pub interface: Option<String>,
|
||||
pub category: Option<MaybeString>,
|
||||
pub ipprotocol: Option<String>,
|
||||
pub descr: Option<MaybeString>,
|
||||
pub tag: Option<MaybeString>,
|
||||
pub tagged: Option<MaybeString>,
|
||||
pub poolopts: PoolOpts,
|
||||
pub poolopts: Option<PoolOpts>,
|
||||
#[yaserde(rename = "associated-rule-id")]
|
||||
pub associated_rule_id: Option<MaybeString>,
|
||||
pub disabled: Option<u8>,
|
||||
pub target: String,
|
||||
pub target: Option<String>,
|
||||
#[yaserde(rename = "local-port")]
|
||||
pub local_port: i32,
|
||||
pub source: Source,
|
||||
pub destination: Destination,
|
||||
pub updated: Updated,
|
||||
pub created: Created,
|
||||
pub local_port: Option<i32>,
|
||||
pub source: Option<Source>,
|
||||
pub destination: Option<Destination>,
|
||||
pub updated: Option<Updated>,
|
||||
pub created: Option<Created>,
|
||||
}
|
||||
|
||||
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
||||
@@ -1545,7 +1545,7 @@ pub struct Vlans {
|
||||
|
||||
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
||||
pub struct Bridges {
|
||||
pub bridged: Option<MaybeString>,
|
||||
pub bridged: Option<RawXml>,
|
||||
}
|
||||
|
||||
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
||||
|
||||
@@ -48,7 +48,7 @@ impl<'a> DhcpConfigLegacyISC<'a> {
|
||||
hostname: &str,
|
||||
) -> Result<(), DhcpError> {
|
||||
let mac = mac.to_string();
|
||||
let hostname = hostname.to_string();
|
||||
let hostname = Some(hostname.to_string());
|
||||
let lan_dhcpd = self.get_lan_dhcpd();
|
||||
let existing_mappings: &mut Vec<StaticMap> = &mut lan_dhcpd.staticmaps;
|
||||
|
||||
@@ -121,7 +121,7 @@ impl<'a> DhcpConfigLegacyISC<'a> {
|
||||
.map(|entry| StaticMap {
|
||||
mac: entry["mac"].as_str().unwrap_or_default().to_string(),
|
||||
ipaddr: entry["ipaddr"].as_str().unwrap_or_default().to_string(),
|
||||
hostname: entry["hostname"].as_str().unwrap_or_default().to_string(),
|
||||
hostname: Some(entry["hostname"].as_str().unwrap_or_default().to_string()),
|
||||
descr: entry["descr"].as_str().map(MaybeString::from),
|
||||
..Default::default()
|
||||
})
|
||||
|
||||
@@ -213,7 +213,7 @@ impl<'a> DhcpConfigDnsMasq<'a> {
|
||||
.map(|entry| StaticMap {
|
||||
mac: entry["mac"].as_str().unwrap_or_default().to_string(),
|
||||
ipaddr: entry["ipaddr"].as_str().unwrap_or_default().to_string(),
|
||||
hostname: entry["hostname"].as_str().unwrap_or_default().to_string(),
|
||||
hostname: Some(entry["hostname"].as_str().unwrap_or_default().to_string()),
|
||||
descr: entry["descr"].as_str().map(MaybeString::from),
|
||||
..Default::default()
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user