All checks were successful
Run Check Script / check (pull_request) Successful in 1m28s
283 lines
8.3 KiB
Rust
283 lines
8.3 KiB
Rust
use actix_web::{App, HttpResponse, HttpServer, Responder, get, web};
|
|
use k8s_openapi::api::core::v1::Node;
|
|
use kube::{Api, Client, Config};
|
|
|
|
use log::{debug, error, info, warn};
|
|
use reqwest;
|
|
use serde::{Deserialize, Serialize};
|
|
use std::env;
|
|
use std::time::{Duration, Instant};
|
|
use tokio::task::JoinSet;
|
|
|
|
const K8S_CLIENT_TIMEOUT: Duration = Duration::from_secs(1);
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
struct HealthStatus {
|
|
status: String,
|
|
checks: Vec<CheckResult>,
|
|
total_duration_ms: u128,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
struct CheckResult {
|
|
name: String,
|
|
passed: bool,
|
|
#[serde(skip_serializing_if = "Option::is_none")]
|
|
reason: Option<String>,
|
|
duration_ms: u128,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
struct HealthError {
|
|
status: String,
|
|
error: String,
|
|
}
|
|
|
|
#[derive(Deserialize)]
|
|
struct HealthQuery {
|
|
#[serde(rename = "check")]
|
|
checks: Option<String>,
|
|
}
|
|
|
|
/// Check if the node's Ready condition is true via Kubernetes API
|
|
async fn check_node_ready(client: Client, node_name: &str) -> Result<(), String> {
|
|
let nodes: Api<Node> = Api::all(client);
|
|
|
|
let node = match nodes.get(node_name).await {
|
|
Ok(n) => n,
|
|
Err(e) => {
|
|
warn!(
|
|
"Kubernetes API appears to be down, unreachable, or timed out for node '{}': {}. Assuming node is ready.",
|
|
node_name, e
|
|
);
|
|
return Ok(());
|
|
}
|
|
};
|
|
|
|
let conditions = node.status.and_then(|s| s.conditions).unwrap_or_default();
|
|
|
|
for condition in conditions {
|
|
if condition.type_ == "Ready" {
|
|
let is_ready = condition.status == "True";
|
|
let reason = condition
|
|
.reason
|
|
.clone()
|
|
.unwrap_or_else(|| "Unknown".to_string());
|
|
|
|
if !is_ready {
|
|
return Err(reason);
|
|
}
|
|
return Ok(());
|
|
}
|
|
}
|
|
|
|
Err("Ready condition not found".to_string())
|
|
}
|
|
|
|
/// Check OKD router health endpoint on port 1936
|
|
async fn check_okd_router_1936() -> Result<(), String> {
|
|
debug!("Checking okd router 1936");
|
|
let client = reqwest::Client::builder()
|
|
.timeout(std::time::Duration::from_secs(5))
|
|
.build()
|
|
.map_err(|e| format!("Failed to build HTTP client: {}", e))?;
|
|
|
|
let response = client
|
|
.get("http://127.0.0.1:1936/healthz/ready")
|
|
.send()
|
|
.await
|
|
.map_err(|e| format!("Failed to connect to OKD router: {}", e))?;
|
|
|
|
debug!("okd router 1936 response status {}", response.status());
|
|
|
|
if response.status().is_success() {
|
|
Ok(())
|
|
} else {
|
|
Err(format!("OKD router returned status: {}", response.status()))
|
|
}
|
|
}
|
|
|
|
/// Parse comma-separated check names from query parameter
|
|
fn parse_checks(checks_param: Option<&str>) -> Vec<String> {
|
|
match checks_param {
|
|
None => vec!["node_ready".to_string()],
|
|
Some(s) if s.is_empty() => vec!["node_ready".to_string()],
|
|
Some(s) => s.split(',').map(|c| c.trim().to_string()).collect(),
|
|
}
|
|
}
|
|
|
|
/// Run a single health check by name and return the result
|
|
async fn run_check(check_name: &str, client: Option<Client>, node_name: &str) -> CheckResult {
|
|
let start = Instant::now();
|
|
|
|
let result = match check_name {
|
|
"node_ready" => match client {
|
|
Some(c) => check_node_ready(c, node_name).await,
|
|
None => {
|
|
warn!(
|
|
"Kubernetes client not available for node '{}'. Assuming node is ready.",
|
|
node_name
|
|
);
|
|
Ok(())
|
|
}
|
|
},
|
|
"okd_router_1936" => check_okd_router_1936().await,
|
|
_ => Err(format!("Unknown check: {}", check_name)),
|
|
};
|
|
|
|
let duration_ms = start.elapsed().as_millis();
|
|
|
|
match result {
|
|
Ok(()) => CheckResult {
|
|
name: check_name.to_string(),
|
|
passed: true,
|
|
reason: None,
|
|
duration_ms,
|
|
},
|
|
Err(reason) => CheckResult {
|
|
name: check_name.to_string(),
|
|
passed: false,
|
|
reason: Some(reason),
|
|
duration_ms,
|
|
},
|
|
}
|
|
}
|
|
|
|
#[get("/health")]
|
|
async fn health(query: web::Query<HealthQuery>) -> impl Responder {
|
|
let node_name = match env::var("NODE_NAME") {
|
|
Ok(name) => name,
|
|
Err(_) => {
|
|
error!("NODE_NAME environment variable not set");
|
|
return HttpResponse::InternalServerError().json(HealthError {
|
|
status: "error".to_string(),
|
|
error: "NODE_NAME environment variable not set".to_string(),
|
|
});
|
|
}
|
|
};
|
|
|
|
// Parse requested checks from query parameter
|
|
let requested_checks = parse_checks(query.checks.as_deref());
|
|
|
|
// Check if node_ready check requires Kubernetes client
|
|
let needs_k8s_client = requested_checks.contains(&"node_ready".to_string());
|
|
|
|
// Initialize Kubernetes client only if needed
|
|
let k8s_client = if needs_k8s_client {
|
|
match Config::infer().await {
|
|
Ok(mut config) => {
|
|
config.write_timeout = Some(K8S_CLIENT_TIMEOUT);
|
|
config.connect_timeout = Some(K8S_CLIENT_TIMEOUT);
|
|
Some(Client::try_from(config).map_err(|e| e.to_string()))
|
|
}
|
|
Err(e) => {
|
|
warn!(
|
|
"Failed to infer Kubernetes config for node '{}': {}. Assuming node_ready is healthy.",
|
|
node_name, e
|
|
);
|
|
None
|
|
}
|
|
}
|
|
.and_then(|result| match result {
|
|
Ok(client) => Some(client),
|
|
Err(e) => {
|
|
warn!(
|
|
"Failed to create Kubernetes client for node '{}': {}. Assuming node_ready is healthy.",
|
|
node_name, e
|
|
);
|
|
None
|
|
}
|
|
})
|
|
} else {
|
|
None
|
|
};
|
|
|
|
// Run all requested checks in parallel
|
|
let start = Instant::now();
|
|
let mut join_set = JoinSet::new();
|
|
debug!("Running checks {requested_checks:?}");
|
|
|
|
for check_name in requested_checks {
|
|
let client = k8s_client.clone();
|
|
let node_name = node_name.clone();
|
|
join_set.spawn(async move { run_check(&check_name, client, &node_name).await });
|
|
}
|
|
let mut check_results = Vec::new();
|
|
while let Some(result) = join_set.join_next().await {
|
|
match result {
|
|
Ok(check) => check_results.push(check),
|
|
Err(e) => error!("Check task failed: {}", e),
|
|
}
|
|
}
|
|
let total_duration_ms = start.elapsed().as_millis();
|
|
|
|
// Determine overall status
|
|
let all_passed = check_results.iter().all(|c| c.passed);
|
|
|
|
if all_passed {
|
|
info!(
|
|
"All health checks passed for node '{}' in {}ms",
|
|
node_name, total_duration_ms
|
|
);
|
|
HttpResponse::Ok().json(HealthStatus {
|
|
status: "ready".to_string(),
|
|
checks: check_results,
|
|
total_duration_ms,
|
|
})
|
|
} else {
|
|
let failed_checks: Vec<&str> = check_results
|
|
.iter()
|
|
.filter(|c| !c.passed)
|
|
.map(|c| c.name.as_str())
|
|
.collect();
|
|
warn!(
|
|
"Health checks failed for node '{}' in {}ms: {:?}",
|
|
node_name, total_duration_ms, failed_checks
|
|
);
|
|
HttpResponse::ServiceUnavailable().json(HealthStatus {
|
|
status: "not-ready".to_string(),
|
|
checks: check_results,
|
|
total_duration_ms,
|
|
})
|
|
}
|
|
}
|
|
|
|
#[actix_web::main]
|
|
async fn main() -> std::io::Result<()> {
|
|
env_logger::init();
|
|
|
|
let port = env::var("LISTEN_PORT").unwrap_or_else(|_| "25001".to_string());
|
|
let port = port
|
|
.parse::<u16>()
|
|
.unwrap_or_else(|_| panic!("Invalid port number: {}", port));
|
|
let bind_addr = format!("0.0.0.0:{}", port);
|
|
|
|
info!("Starting harmony-node-readiness-endpoint on {}", bind_addr);
|
|
|
|
HttpServer::new(|| App::new().service(health))
|
|
.workers(3)
|
|
.bind(&bind_addr)?
|
|
.run()
|
|
.await
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use kube::error::ErrorResponse;
|
|
|
|
#[test]
|
|
fn parse_checks_defaults_to_node_ready() {
|
|
assert_eq!(parse_checks(None), vec!["node_ready"]);
|
|
assert_eq!(parse_checks(Some("")), vec!["node_ready"]);
|
|
}
|
|
|
|
#[test]
|
|
fn parse_checks_splits_and_trims_values() {
|
|
assert_eq!(
|
|
parse_checks(Some("node_ready, okd_router_1936 ")),
|
|
vec!["node_ready", "okd_router_1936"]
|
|
);
|
|
}
|
|
}
|