Compare commits
4 Commits
feat/ceph-
...
84f38974b1
| Author | SHA1 | Date | |
|---|---|---|---|
| 84f38974b1 | |||
| 7d027bcfc4 | |||
|
|
610ce84280 | ||
|
|
8bb4a9d3f6 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1834,6 +1834,7 @@ name = "harmony_cli"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"assert_cmd",
|
||||
"chrono",
|
||||
"clap",
|
||||
"console",
|
||||
"env_logger",
|
||||
|
||||
@@ -20,7 +20,7 @@ readme = "README.md"
|
||||
license = "GNU AGPL v3"
|
||||
|
||||
[workspace.dependencies]
|
||||
log = "0.4"
|
||||
log = { version = "0.4", features = ["kv"] }
|
||||
env_logger = "0.11"
|
||||
derive-new = "0.7"
|
||||
async-trait = "0.1"
|
||||
|
||||
BIN
examples/application_monitoring_with_tenant/harmony
Executable file
BIN
examples/application_monitoring_with_tenant/harmony
Executable file
Binary file not shown.
@@ -8,7 +8,6 @@ use harmony::{
|
||||
hardware::{FirewallGroup, HostCategory, Location, PhysicalHost, SwitchGroup},
|
||||
infra::opnsense::OPNSenseManagementInterface,
|
||||
inventory::Inventory,
|
||||
maestro::Maestro,
|
||||
modules::{
|
||||
http::StaticFilesHttpScore,
|
||||
ipxe::IpxeScore,
|
||||
@@ -130,16 +129,21 @@ async fn main() {
|
||||
"./data/watchguard/pxe-http-files".to_string(),
|
||||
));
|
||||
let ipxe_score = IpxeScore::new();
|
||||
let mut maestro = Maestro::initialize(inventory, topology).await.unwrap();
|
||||
maestro.register_all(vec![
|
||||
Box::new(dns_score),
|
||||
Box::new(bootstrap_dhcp_score),
|
||||
Box::new(bootstrap_load_balancer_score),
|
||||
Box::new(load_balancer_score),
|
||||
Box::new(tftp_score),
|
||||
Box::new(http_score),
|
||||
Box::new(ipxe_score),
|
||||
Box::new(dhcp_score),
|
||||
]);
|
||||
harmony_tui::init(maestro).await.unwrap();
|
||||
|
||||
harmony_tui::run(
|
||||
inventory,
|
||||
topology,
|
||||
vec![
|
||||
Box::new(dns_score),
|
||||
Box::new(bootstrap_dhcp_score),
|
||||
Box::new(bootstrap_load_balancer_score),
|
||||
Box::new(load_balancer_score),
|
||||
Box::new(tftp_score),
|
||||
Box::new(http_score),
|
||||
Box::new(ipxe_score),
|
||||
Box::new(dhcp_score),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ use harmony::{
|
||||
hardware::{FirewallGroup, HostCategory, Location, PhysicalHost, SwitchGroup},
|
||||
infra::opnsense::OPNSenseManagementInterface,
|
||||
inventory::Inventory,
|
||||
maestro::Maestro,
|
||||
modules::{
|
||||
dummy::{ErrorScore, PanicScore, SuccessScore},
|
||||
http::StaticFilesHttpScore,
|
||||
@@ -84,20 +83,25 @@ async fn main() {
|
||||
let http_score = StaticFilesHttpScore::new(Url::LocalFolder(
|
||||
"./data/watchguard/pxe-http-files".to_string(),
|
||||
));
|
||||
let mut maestro = Maestro::initialize(inventory, topology).await.unwrap();
|
||||
maestro.register_all(vec![
|
||||
Box::new(dns_score),
|
||||
Box::new(dhcp_score),
|
||||
Box::new(load_balancer_score),
|
||||
Box::new(tftp_score),
|
||||
Box::new(http_score),
|
||||
Box::new(OPNsenseShellCommandScore {
|
||||
opnsense: opnsense.get_opnsense_config(),
|
||||
command: "touch /tmp/helloharmonytouching".to_string(),
|
||||
}),
|
||||
Box::new(SuccessScore {}),
|
||||
Box::new(ErrorScore {}),
|
||||
Box::new(PanicScore {}),
|
||||
]);
|
||||
harmony_tui::init(maestro).await.unwrap();
|
||||
|
||||
harmony_tui::run(
|
||||
inventory,
|
||||
topology,
|
||||
vec![
|
||||
Box::new(dns_score),
|
||||
Box::new(dhcp_score),
|
||||
Box::new(load_balancer_score),
|
||||
Box::new(tftp_score),
|
||||
Box::new(http_score),
|
||||
Box::new(OPNsenseShellCommandScore {
|
||||
opnsense: opnsense.get_opnsense_config(),
|
||||
command: "touch /tmp/helloharmonytouching".to_string(),
|
||||
}),
|
||||
Box::new(SuccessScore {}),
|
||||
Box::new(ErrorScore {}),
|
||||
Box::new(PanicScore {}),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
@@ -1,12 +0,0 @@
|
||||
[package]
|
||||
name = "example_remove_rook_osd"
|
||||
edition = "2024"
|
||||
version.workspace = true
|
||||
readme.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
harmony = { version = "0.1.0", path = "../../harmony" }
|
||||
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
|
||||
harmony_tui = { version = "0.1.0", path = "../../harmony_tui" }
|
||||
tokio.workspace = true
|
||||
@@ -1,18 +0,0 @@
|
||||
use harmony::{
|
||||
inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd,
|
||||
topology::K8sAnywhereTopology,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let ceph_score = CephRemoveOsd {
|
||||
osd_deployment_name: "rook-ceph-osd-2".to_string(),
|
||||
rook_ceph_namespace: "rook-ceph".to_string(),
|
||||
};
|
||||
|
||||
let topology = K8sAnywhereTopology::from_env();
|
||||
let inventory = Inventory::autoload();
|
||||
harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
@@ -2,7 +2,6 @@ use std::net::{SocketAddr, SocketAddrV4};
|
||||
|
||||
use harmony::{
|
||||
inventory::Inventory,
|
||||
maestro::Maestro,
|
||||
modules::{
|
||||
dns::DnsScore,
|
||||
dummy::{ErrorScore, PanicScore, SuccessScore},
|
||||
@@ -16,18 +15,19 @@ use harmony_macros::ipv4;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let inventory = Inventory::autoload();
|
||||
let topology = DummyInfra {};
|
||||
let mut maestro = Maestro::initialize(inventory, topology).await.unwrap();
|
||||
|
||||
maestro.register_all(vec![
|
||||
Box::new(SuccessScore {}),
|
||||
Box::new(ErrorScore {}),
|
||||
Box::new(PanicScore {}),
|
||||
Box::new(DnsScore::new(vec![], None)),
|
||||
Box::new(build_large_score()),
|
||||
]);
|
||||
harmony_tui::init(maestro).await.unwrap();
|
||||
harmony_tui::run(
|
||||
Inventory::autoload(),
|
||||
DummyInfra {},
|
||||
vec![
|
||||
Box::new(SuccessScore {}),
|
||||
Box::new(ErrorScore {}),
|
||||
Box::new(PanicScore {}),
|
||||
Box::new(DnsScore::new(vec![], None)),
|
||||
Box::new(build_large_score()),
|
||||
],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn build_large_score() -> LoadBalancerScore {
|
||||
|
||||
@@ -241,7 +241,7 @@ pub struct DummyInfra;
|
||||
#[async_trait]
|
||||
impl Topology for DummyInfra {
|
||||
fn name(&self) -> &str {
|
||||
todo!()
|
||||
"DummyInfra"
|
||||
}
|
||||
|
||||
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
|
||||
|
||||
@@ -5,7 +5,7 @@ use k8s_openapi::{
|
||||
};
|
||||
use kube::{
|
||||
Client, Config, Error, Resource,
|
||||
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
|
||||
api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt},
|
||||
config::{KubeConfigOptions, Kubeconfig},
|
||||
core::ErrorResponse,
|
||||
runtime::reflector::Lookup,
|
||||
@@ -17,9 +17,7 @@ use kube::{
|
||||
};
|
||||
use log::{debug, error, trace};
|
||||
use serde::{Serialize, de::DeserializeOwned};
|
||||
use serde_json::json;
|
||||
use similar::TextDiff;
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
#[derive(new, Clone)]
|
||||
pub struct K8sClient {
|
||||
@@ -53,66 +51,6 @@ impl K8sClient {
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_deployment(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
) -> Result<Option<Deployment>, Error> {
|
||||
let deps: Api<Deployment> = if let Some(ns) = namespace {
|
||||
Api::namespaced(self.client.clone(), ns)
|
||||
} else {
|
||||
Api::default_namespaced(self.client.clone())
|
||||
};
|
||||
Ok(deps.get_opt(name).await?)
|
||||
}
|
||||
|
||||
pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> {
|
||||
let pods: Api<Pod> = if let Some(ns) = namespace {
|
||||
Api::namespaced(self.client.clone(), ns)
|
||||
} else {
|
||||
Api::default_namespaced(self.client.clone())
|
||||
};
|
||||
Ok(pods.get_opt(name).await?)
|
||||
}
|
||||
|
||||
pub async fn scale_deployment(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
replicas: u32,
|
||||
) -> Result<(), Error> {
|
||||
let deployments: Api<Deployment> = if let Some(ns) = namespace {
|
||||
Api::namespaced(self.client.clone(), ns)
|
||||
} else {
|
||||
Api::default_namespaced(self.client.clone())
|
||||
};
|
||||
|
||||
let patch = json!({
|
||||
"spec": {
|
||||
"replicas": replicas
|
||||
}
|
||||
});
|
||||
let pp = PatchParams::default();
|
||||
let scale = Patch::Apply(&patch);
|
||||
deployments.patch_scale(name, &pp, &scale).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_deployment(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
) -> Result<(), Error> {
|
||||
let deployments: Api<Deployment> = if let Some(ns) = namespace {
|
||||
Api::namespaced(self.client.clone(), ns)
|
||||
} else {
|
||||
Api::default_namespaced(self.client.clone())
|
||||
};
|
||||
let delete_params = DeleteParams::default();
|
||||
deployments.delete(name, &delete_params).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn wait_until_deployment_ready(
|
||||
&self,
|
||||
name: String,
|
||||
@@ -138,68 +76,6 @@ impl K8sClient {
|
||||
}
|
||||
}
|
||||
|
||||
/// Will execute a commond in the first pod found that matches the specified label
|
||||
/// '{label}={name}'
|
||||
pub async fn exec_app_capture_output(
|
||||
&self,
|
||||
name: String,
|
||||
label: String,
|
||||
namespace: Option<&str>,
|
||||
command: Vec<&str>,
|
||||
) -> Result<String, String> {
|
||||
let api: Api<Pod>;
|
||||
|
||||
if let Some(ns) = namespace {
|
||||
api = Api::namespaced(self.client.clone(), ns);
|
||||
} else {
|
||||
api = Api::default_namespaced(self.client.clone());
|
||||
}
|
||||
let pod_list = api
|
||||
.list(&ListParams::default().labels(format!("{label}={name}").as_str()))
|
||||
.await
|
||||
.expect("couldn't get list of pods");
|
||||
|
||||
let res = api
|
||||
.exec(
|
||||
pod_list
|
||||
.items
|
||||
.first()
|
||||
.expect("couldn't get pod")
|
||||
.name()
|
||||
.expect("couldn't get pod name")
|
||||
.into_owned()
|
||||
.as_str(),
|
||||
command,
|
||||
&AttachParams::default().stdout(true).stderr(true),
|
||||
)
|
||||
.await;
|
||||
match res {
|
||||
Err(e) => Err(e.to_string()),
|
||||
Ok(mut process) => {
|
||||
let status = process
|
||||
.take_status()
|
||||
.expect("Couldn't get status")
|
||||
.await
|
||||
.expect("Couldn't unwrap status");
|
||||
|
||||
if let Some(s) = status.status {
|
||||
let mut stdout_buf = String::new();
|
||||
if let Some(mut stdout) = process.stdout().take() {
|
||||
stdout.read_to_string(&mut stdout_buf).await;
|
||||
}
|
||||
debug!("Status: {} - {:?}", s, status.details);
|
||||
if s == "Success" {
|
||||
Ok(stdout_buf)
|
||||
} else {
|
||||
Err(s)
|
||||
}
|
||||
} else {
|
||||
Err("Couldn't get inner status of pod exec".to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}`
|
||||
pub async fn exec_app(
|
||||
&self,
|
||||
|
||||
@@ -14,6 +14,5 @@ pub mod monitoring;
|
||||
pub mod okd;
|
||||
pub mod opnsense;
|
||||
pub mod prometheus;
|
||||
pub mod storage;
|
||||
pub mod tenant;
|
||||
pub mod tftp;
|
||||
|
||||
@@ -1,419 +0,0 @@
|
||||
use std::{
|
||||
process::Command,
|
||||
sync::Arc,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::{info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::{
|
||||
data::{Id, Version},
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
score::Score,
|
||||
topology::{K8sclient, Topology, k8s::K8sClient},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct CephRemoveOsd {
|
||||
pub osd_deployment_name: String,
|
||||
pub rook_ceph_namespace: String,
|
||||
}
|
||||
|
||||
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
|
||||
fn name(&self) -> String {
|
||||
format!("CephRemoveOsdScore")
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(CephRemoveOsdInterpret {
|
||||
score: self.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CephRemoveOsdInterpret {
|
||||
score: CephRemoveOsd,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
_inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let client = topology.k8s_client().await.unwrap();
|
||||
self.verify_ceph_toolbox_exists(client.clone()).await?;
|
||||
self.scale_deployment(client.clone()).await?;
|
||||
self.verify_deployment_scaled(client.clone()).await?;
|
||||
self.delete_deployment(client.clone()).await?;
|
||||
self.verify_deployment_deleted(client.clone()).await?;
|
||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
|
||||
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
|
||||
.await?;
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
|
||||
osd_id_full, self.score.osd_deployment_name
|
||||
)))
|
||||
}
|
||||
fn get_name(&self) -> InterpretName {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl CephRemoveOsdInterpret {
|
||||
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
|
||||
let osd_id_numeric = self
|
||||
.score
|
||||
.osd_deployment_name
|
||||
.split('-')
|
||||
.nth(3)
|
||||
.ok_or_else(|| {
|
||||
InterpretError::new(format!(
|
||||
"Could not parse OSD id from deployment name {}",
|
||||
self.score.osd_deployment_name
|
||||
))
|
||||
})?;
|
||||
let osd_id_full = format!("osd.{}", osd_id_numeric);
|
||||
|
||||
info!(
|
||||
"Targeting Ceph OSD: {} (parsed from deployment {})",
|
||||
osd_id_full, self.score.osd_deployment_name
|
||||
);
|
||||
|
||||
Ok(osd_id_full)
|
||||
}
|
||||
|
||||
pub async fn verify_ceph_toolbox_exists(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let toolbox_dep = "rook-ceph-tools".to_string();
|
||||
|
||||
match client
|
||||
.get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace))
|
||||
.await
|
||||
{
|
||||
Ok(Some(deployment)) => {
|
||||
if let Some(status) = deployment.status {
|
||||
let ready_count = status.ready_replicas.unwrap_or(0);
|
||||
if ready_count >= 1 {
|
||||
return Ok(Outcome::success(format!(
|
||||
"'{}' is ready with {} replica(s).",
|
||||
&toolbox_dep, ready_count
|
||||
)));
|
||||
} else {
|
||||
return Err(InterpretError::new(
|
||||
"ceph-tool-box not ready in cluster".to_string(),
|
||||
));
|
||||
}
|
||||
} else {
|
||||
Err(InterpretError::new(format!(
|
||||
"failed to get deployment status {}",
|
||||
&toolbox_dep
|
||||
)))
|
||||
}
|
||||
}
|
||||
Ok(None) => Err(InterpretError::new(format!(
|
||||
"Deployment '{}' not found in namespace '{}'.",
|
||||
&toolbox_dep, self.score.rook_ceph_namespace
|
||||
))),
|
||||
Err(e) => Err(InterpretError::new(format!(
|
||||
"Failed to query for deployment '{}': {}",
|
||||
&toolbox_dep, e
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn scale_deployment(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
"Scaling down OSD deployment: {}",
|
||||
self.score.osd_deployment_name
|
||||
);
|
||||
client
|
||||
.scale_deployment(
|
||||
&self.score.osd_deployment_name,
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
0,
|
||||
)
|
||||
.await?;
|
||||
Ok(Outcome::success(format!(
|
||||
"Scaled down deployment {}",
|
||||
self.score.osd_deployment_name
|
||||
)))
|
||||
}
|
||||
|
||||
pub async fn verify_deployment_scaled(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let (timeout, interval, start) = self.build_timer();
|
||||
|
||||
info!("Waiting for OSD deployment to scale down to 0 replicas");
|
||||
loop {
|
||||
let dep = client
|
||||
.get_deployment(
|
||||
&self.score.osd_deployment_name,
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(deployment) = dep {
|
||||
if let Some(status) = deployment.status {
|
||||
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
|
||||
{
|
||||
return Ok(Outcome::success(
|
||||
"Deployment successfully scaled down.".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if start.elapsed() > timeout {
|
||||
return Err(InterpretError::new(format!(
|
||||
"Timed out waiting for deployment {} to scale down",
|
||||
self.score.osd_deployment_name
|
||||
)));
|
||||
}
|
||||
sleep(interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn build_timer(&self) -> (Duration, Duration, Instant) {
|
||||
let timeout = Duration::from_secs(120);
|
||||
let interval = Duration::from_secs(5);
|
||||
let start = Instant::now();
|
||||
(timeout, interval, start)
|
||||
}
|
||||
pub async fn delete_deployment(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
"Deleting OSD deployment: {}",
|
||||
self.score.osd_deployment_name
|
||||
);
|
||||
client
|
||||
.delete_deployment(
|
||||
&self.score.osd_deployment_name,
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
)
|
||||
.await?;
|
||||
Ok(Outcome::success(format!(
|
||||
"deployment {} deleted",
|
||||
self.score.osd_deployment_name
|
||||
)))
|
||||
}
|
||||
|
||||
pub async fn verify_deployment_deleted(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let (timeout, interval, start) = self.build_timer();
|
||||
|
||||
info!("Waiting for OSD deployment to scale down to 0 replicas");
|
||||
loop {
|
||||
let dep = client
|
||||
.get_deployment(
|
||||
&self.score.osd_deployment_name,
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
)
|
||||
.await?;
|
||||
|
||||
if dep.is_none() {
|
||||
info!(
|
||||
"Deployment {} successfully deleted.",
|
||||
self.score.osd_deployment_name
|
||||
);
|
||||
return Ok(Outcome::success(format!(
|
||||
"Deployment {} deleted.",
|
||||
self.score.osd_deployment_name
|
||||
)));
|
||||
}
|
||||
|
||||
if start.elapsed() > timeout {
|
||||
return Err(InterpretError::new(format!(
|
||||
"Timed out waiting for deployment {} to be deleted",
|
||||
self.score.osd_deployment_name
|
||||
)));
|
||||
}
|
||||
sleep(interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn get_osd_tree(&self, json: serde_json::Value) -> Result<CephOsdTree, InterpretError> {
|
||||
let nodes = json.get("nodes").ok_or_else(|| {
|
||||
InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string())
|
||||
})?;
|
||||
let tree: CephOsdTree = CephOsdTree {
|
||||
nodes: serde_json::from_value(nodes.clone()).map_err(|e| {
|
||||
InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e))
|
||||
})?,
|
||||
};
|
||||
Ok(tree)
|
||||
}
|
||||
|
||||
pub async fn purge_ceph_osd(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
osd_id_full: &str,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
"Purging OSD {} from Ceph cluster and removing its auth key",
|
||||
osd_id_full
|
||||
);
|
||||
client
|
||||
.exec_app_capture_output(
|
||||
"rook-ceph-tools".to_string(),
|
||||
"app".to_string(),
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
vec![
|
||||
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
|
||||
format!("ceph auth del osd.{osd_id_full}").as_str(),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
Ok(Outcome::success(format!(
|
||||
"osd id {} removed from osd tree",
|
||||
osd_id_full
|
||||
)))
|
||||
}
|
||||
|
||||
pub async fn verify_ceph_osd_removal(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
osd_id_full: &str,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let (timeout, interval, start) = self.build_timer();
|
||||
info!(
|
||||
"Verifying OSD {} has been removed from the Ceph tree...",
|
||||
osd_id_full
|
||||
);
|
||||
loop {
|
||||
let output = client
|
||||
.exec_app_capture_output(
|
||||
"rook-ceph-tools".to_string(),
|
||||
"app".to_string(),
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
vec!["ceph osd tree -f json"],
|
||||
)
|
||||
.await?;
|
||||
let tree =
|
||||
self.get_osd_tree(serde_json::from_str(&output).expect("could not extract json"));
|
||||
|
||||
let osd_found = tree
|
||||
.unwrap()
|
||||
.nodes
|
||||
.iter()
|
||||
.any(|node| node.name == osd_id_full);
|
||||
|
||||
if !osd_found {
|
||||
return Ok(Outcome::success(format!(
|
||||
"Successfully verified that OSD {} is removed from the Ceph cluster.",
|
||||
osd_id_full,
|
||||
)));
|
||||
}
|
||||
|
||||
if start.elapsed() > timeout {
|
||||
return Err(InterpretError::new(format!(
|
||||
"Timed out waiting for OSD {} to be removed from Ceph tree",
|
||||
osd_id_full
|
||||
)));
|
||||
}
|
||||
|
||||
warn!(
|
||||
"OSD {} still found in Ceph tree, retrying in {:?}...",
|
||||
osd_id_full, interval
|
||||
);
|
||||
sleep(interval).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
#[derive(Debug, Deserialize, PartialEq)]
|
||||
pub struct CephOsdTree {
|
||||
pub nodes: Vec<CephNode>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, PartialEq)]
|
||||
pub struct CephNode {
|
||||
pub id: i32,
|
||||
pub name: String,
|
||||
#[serde(rename = "type")]
|
||||
pub node_type: String,
|
||||
pub type_id: Option<i32>,
|
||||
pub children: Option<Vec<i32>>,
|
||||
pub exists: Option<i32>,
|
||||
pub status: Option<String>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use serde_json::json;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_get_osd_tree() {
|
||||
let json_data = json!({
|
||||
"nodes": [
|
||||
{"id": 1, "name": "osd.1", "type": "osd", "primary_affinity":"1"},
|
||||
{"id": 2, "name": "osd.2", "type": "osd", "crush_weight": 1.22344}
|
||||
]
|
||||
});
|
||||
let interpret = CephRemoveOsdInterpret {
|
||||
score: CephRemoveOsd {
|
||||
osd_deployment_name: "osd-1".to_string(),
|
||||
rook_ceph_namespace: "dummy_ns".to_string(),
|
||||
},
|
||||
};
|
||||
let json = interpret.get_osd_tree(json_data).unwrap();
|
||||
|
||||
let expected = CephOsdTree {
|
||||
nodes: vec![
|
||||
CephNode {
|
||||
id: 1,
|
||||
name: "osd.1".to_string(),
|
||||
node_type: "osd".to_string(),
|
||||
type_id: None,
|
||||
children: None,
|
||||
exists: None,
|
||||
status: None,
|
||||
},
|
||||
CephNode {
|
||||
id: 2,
|
||||
name: "osd.2".to_string(),
|
||||
node_type: "osd".to_string(),
|
||||
type_id: None,
|
||||
children: None,
|
||||
exists: None,
|
||||
status: None,
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
assert_eq!(json, expected);
|
||||
}
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
pub mod ceph_remove_osd_score;
|
||||
@@ -1 +0,0 @@
|
||||
pub mod ceph;
|
||||
@@ -22,6 +22,7 @@ indicatif = "0.18.0"
|
||||
lazy_static = "1.5.0"
|
||||
log.workspace = true
|
||||
indicatif-log-bridge = "0.2.3"
|
||||
chrono.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
harmony = { path = "../harmony", features = ["testing"] }
|
||||
|
||||
@@ -1,22 +1,17 @@
|
||||
use chrono::Local;
|
||||
use console::style;
|
||||
use harmony::{
|
||||
instrumentation::{self, HarmonyEvent},
|
||||
modules::application::ApplicationFeatureStatus,
|
||||
topology::TopologyStatus,
|
||||
};
|
||||
use indicatif::MultiProgress;
|
||||
use indicatif_log_bridge::LogWrapper;
|
||||
use log::error;
|
||||
use std::{
|
||||
sync::{Arc, Mutex},
|
||||
thread,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use crate::progress::{IndicatifProgressTracker, ProgressTracker};
|
||||
use log::{error, info, log_enabled};
|
||||
use std::io::Write;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
pub fn init() -> tokio::task::JoinHandle<()> {
|
||||
let base_progress = configure_logger();
|
||||
let handle = tokio::spawn(handle_events(base_progress));
|
||||
configure_logger();
|
||||
let handle = tokio::spawn(handle_events());
|
||||
|
||||
loop {
|
||||
if instrumentation::instrument(HarmonyEvent::HarmonyStarted).is_ok() {
|
||||
@@ -27,28 +22,76 @@ pub fn init() -> tokio::task::JoinHandle<()> {
|
||||
handle
|
||||
}
|
||||
|
||||
fn configure_logger() -> MultiProgress {
|
||||
let logger =
|
||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build();
|
||||
let level = logger.filter();
|
||||
let progress = MultiProgress::new();
|
||||
fn configure_logger() {
|
||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
|
||||
.format(|buf, record| {
|
||||
let debug_mode = log_enabled!(log::Level::Debug);
|
||||
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S");
|
||||
|
||||
LogWrapper::new(progress.clone(), logger)
|
||||
.try_init()
|
||||
.unwrap();
|
||||
log::set_max_level(level);
|
||||
|
||||
progress
|
||||
let level = match record.level() {
|
||||
log::Level::Error => style("ERROR").red(),
|
||||
log::Level::Warn => style("WARN").yellow(),
|
||||
log::Level::Info => style("INFO").green(),
|
||||
log::Level::Debug => style("DEBUG").blue(),
|
||||
log::Level::Trace => style("TRACE").magenta(),
|
||||
};
|
||||
if let Some(status) = record.key_values().get(log::kv::Key::from("status")) {
|
||||
let status = status.to_borrowed_str().unwrap();
|
||||
let emoji = match status {
|
||||
"finished" => style(crate::theme::EMOJI_SUCCESS.to_string()).green(),
|
||||
"skipped" => style(crate::theme::EMOJI_SKIP.to_string()).yellow(),
|
||||
"failed" => style(crate::theme::EMOJI_ERROR.to_string()).red(),
|
||||
_ => style("".into()),
|
||||
};
|
||||
if debug_mode {
|
||||
writeln!(
|
||||
buf,
|
||||
"[{} {:<5} {}] {} {}",
|
||||
timestamp,
|
||||
level,
|
||||
record.target(),
|
||||
emoji,
|
||||
record.args()
|
||||
)
|
||||
} else {
|
||||
writeln!(buf, "[{:<5}] {} {}", level, emoji, record.args())
|
||||
}
|
||||
} else if let Some(emoji) = record.key_values().get(log::kv::Key::from("emoji")) {
|
||||
if debug_mode {
|
||||
writeln!(
|
||||
buf,
|
||||
"[{} {:<5} {}] {} {}",
|
||||
timestamp,
|
||||
level,
|
||||
record.target(),
|
||||
emoji,
|
||||
record.args()
|
||||
)
|
||||
} else {
|
||||
writeln!(buf, "[{:<5}] {} {}", level, emoji, record.args())
|
||||
}
|
||||
} else if debug_mode {
|
||||
writeln!(
|
||||
buf,
|
||||
"[{} {:<5} {}] {}",
|
||||
timestamp,
|
||||
level,
|
||||
record.target(),
|
||||
record.args()
|
||||
)
|
||||
} else {
|
||||
writeln!(buf, "[{:<5}] {}", level, record.args())
|
||||
}
|
||||
})
|
||||
.init();
|
||||
}
|
||||
|
||||
async fn handle_events(base_progress: MultiProgress) {
|
||||
let progress_tracker = Arc::new(IndicatifProgressTracker::new(base_progress.clone()));
|
||||
async fn handle_events() {
|
||||
let preparing_topology = Arc::new(Mutex::new(false));
|
||||
let current_score: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
|
||||
|
||||
instrumentation::subscribe("Harmony CLI Logger", {
|
||||
move |event| {
|
||||
let progress_tracker = Arc::clone(&progress_tracker);
|
||||
let preparing_topology = Arc::clone(&preparing_topology);
|
||||
let current_score = Arc::clone(¤t_score);
|
||||
|
||||
@@ -59,90 +102,57 @@ async fn handle_events(base_progress: MultiProgress) {
|
||||
match event {
|
||||
HarmonyEvent::HarmonyStarted => {}
|
||||
HarmonyEvent::HarmonyFinished => {
|
||||
progress_tracker.add_section(
|
||||
"harmony-summary",
|
||||
&format!("\n{} Harmony completed\n\n", crate::theme::EMOJI_HARMONY),
|
||||
);
|
||||
progress_tracker.add_section("harmony-finished", "\n\n");
|
||||
thread::sleep(Duration::from_millis(200));
|
||||
let emoji = crate::theme::EMOJI_HARMONY.to_string();
|
||||
info!(emoji = emoji.as_str(); "Harmony completed");
|
||||
return false;
|
||||
}
|
||||
HarmonyEvent::TopologyStateChanged {
|
||||
topology,
|
||||
status,
|
||||
message,
|
||||
} => {
|
||||
let section_key = topology_key(&topology);
|
||||
|
||||
match status {
|
||||
TopologyStatus::Queued => {}
|
||||
TopologyStatus::Preparing => {
|
||||
progress_tracker.add_section(
|
||||
§ion_key,
|
||||
&format!(
|
||||
"\n{} Preparing environment: {topology}...",
|
||||
crate::theme::EMOJI_TOPOLOGY
|
||||
),
|
||||
);
|
||||
(*preparing_topology) = true;
|
||||
}
|
||||
TopologyStatus::Success => {
|
||||
(*preparing_topology) = false;
|
||||
progress_tracker.add_task(§ion_key, "topology-success", "");
|
||||
progress_tracker
|
||||
.finish_task("topology-success", &message.unwrap_or("".into()));
|
||||
}
|
||||
TopologyStatus::Noop => {
|
||||
(*preparing_topology) = false;
|
||||
progress_tracker.add_task(§ion_key, "topology-skip", "");
|
||||
progress_tracker
|
||||
.skip_task("topology-skip", &message.unwrap_or("".into()));
|
||||
}
|
||||
TopologyStatus::Error => {
|
||||
progress_tracker.add_task(§ion_key, "topology-error", "");
|
||||
(*preparing_topology) = false;
|
||||
progress_tracker
|
||||
.fail_task("topology-error", &message.unwrap_or("".into()));
|
||||
} => match status {
|
||||
TopologyStatus::Queued => {}
|
||||
TopologyStatus::Preparing => {
|
||||
let emoji = format!("{}", style(crate::theme::EMOJI_TOPOLOGY.to_string()).yellow());
|
||||
info!(emoji = emoji.as_str(); "Preparing environment: {topology}...");
|
||||
(*preparing_topology) = true;
|
||||
}
|
||||
TopologyStatus::Success => {
|
||||
(*preparing_topology) = false;
|
||||
if let Some(message) = message {
|
||||
info!(status = "finished"; "{message}");
|
||||
}
|
||||
}
|
||||
}
|
||||
TopologyStatus::Noop => {
|
||||
(*preparing_topology) = false;
|
||||
if let Some(message) = message {
|
||||
info!(status = "skipped"; "{message}");
|
||||
}
|
||||
}
|
||||
TopologyStatus::Error => {
|
||||
(*preparing_topology) = false;
|
||||
if let Some(message) = message {
|
||||
error!(status = "failed"; "{message}");
|
||||
}
|
||||
}
|
||||
},
|
||||
HarmonyEvent::InterpretExecutionStarted {
|
||||
execution_id: task_key,
|
||||
topology,
|
||||
execution_id: _,
|
||||
topology: _,
|
||||
interpret: _,
|
||||
score,
|
||||
message,
|
||||
} => {
|
||||
let is_key_topology = (*preparing_topology)
|
||||
&& progress_tracker.contains_section(&topology_key(&topology));
|
||||
let is_key_current_score = current_score.is_some()
|
||||
&& progress_tracker
|
||||
.contains_section(&score_key(¤t_score.clone().unwrap()));
|
||||
let is_key_score = progress_tracker.contains_section(&score_key(&score));
|
||||
|
||||
let section_key = if is_key_topology {
|
||||
topology_key(&topology)
|
||||
} else if is_key_current_score {
|
||||
score_key(¤t_score.clone().unwrap())
|
||||
} else if is_key_score {
|
||||
score_key(&score)
|
||||
if *preparing_topology || current_score.is_some() {
|
||||
info!("{message}");
|
||||
} else {
|
||||
(*current_score) = Some(score.clone());
|
||||
let key = score_key(&score);
|
||||
progress_tracker.add_section(
|
||||
&key,
|
||||
&format!(
|
||||
"{} Interpreting score: {score}...",
|
||||
crate::theme::EMOJI_SCORE
|
||||
),
|
||||
);
|
||||
key
|
||||
};
|
||||
|
||||
progress_tracker.add_task(§ion_key, &task_key, &message);
|
||||
let emoji = format!("{}", style(crate::theme::EMOJI_SCORE).blue());
|
||||
info!(emoji = emoji.as_str(); "Interpreting score: {score}...");
|
||||
}
|
||||
}
|
||||
HarmonyEvent::InterpretExecutionFinished {
|
||||
execution_id: task_key,
|
||||
execution_id: _,
|
||||
topology: _,
|
||||
interpret: _,
|
||||
score,
|
||||
@@ -155,16 +165,17 @@ async fn handle_events(base_progress: MultiProgress) {
|
||||
match outcome {
|
||||
Ok(outcome) => match outcome.status {
|
||||
harmony::interpret::InterpretStatus::SUCCESS => {
|
||||
progress_tracker.finish_task(&task_key, &outcome.message);
|
||||
info!(status = "finished"; "{}", outcome.message);
|
||||
}
|
||||
harmony::interpret::InterpretStatus::NOOP => {
|
||||
progress_tracker.skip_task(&task_key, &outcome.message);
|
||||
info!(status = "skipped"; "{}", outcome.message);
|
||||
}
|
||||
_ => {
|
||||
error!(status = "failed"; "{}", outcome.message);
|
||||
}
|
||||
_ => progress_tracker.fail_task(&task_key, &outcome.message),
|
||||
},
|
||||
Err(err) => {
|
||||
error!("Interpret error: {err}");
|
||||
progress_tracker.fail_task(&task_key, &err.to_string());
|
||||
error!(status = "failed"; "{}", err);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -173,30 +184,17 @@ async fn handle_events(base_progress: MultiProgress) {
|
||||
application,
|
||||
feature,
|
||||
status,
|
||||
} => {
|
||||
if let Some(score) = &(*current_score) {
|
||||
let section_key = score_key(score);
|
||||
let task_key = app_feature_key(&application, &feature);
|
||||
|
||||
match status {
|
||||
ApplicationFeatureStatus::Installing => {
|
||||
let message = format!("Feature '{}' installing...", feature);
|
||||
progress_tracker.add_task(§ion_key, &task_key, &message);
|
||||
}
|
||||
ApplicationFeatureStatus::Installed => {
|
||||
let message = format!("Feature '{}' installed", feature);
|
||||
progress_tracker.finish_task(&task_key, &message);
|
||||
}
|
||||
ApplicationFeatureStatus::Failed { details } => {
|
||||
let message = format!(
|
||||
"Feature '{}' installation failed: {}",
|
||||
feature, details
|
||||
);
|
||||
progress_tracker.fail_task(&task_key, &message);
|
||||
}
|
||||
}
|
||||
} => match status {
|
||||
ApplicationFeatureStatus::Installing => {
|
||||
info!("Installing feature '{}' for '{}'...", feature, application);
|
||||
}
|
||||
}
|
||||
ApplicationFeatureStatus::Installed => {
|
||||
info!(status = "finished"; "Feature '{}' installed", feature);
|
||||
}
|
||||
ApplicationFeatureStatus::Failed { details } => {
|
||||
error!(status = "failed"; "Feature '{}' installation failed: {}", feature, details);
|
||||
}
|
||||
},
|
||||
}
|
||||
true
|
||||
}
|
||||
@@ -204,15 +202,3 @@ async fn handle_events(base_progress: MultiProgress) {
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
fn topology_key(topology: &str) -> String {
|
||||
format!("topology-{topology}")
|
||||
}
|
||||
|
||||
fn score_key(score: &str) -> String {
|
||||
format!("score-{score}")
|
||||
}
|
||||
|
||||
fn app_feature_key(application: &str, feature: &str) -> String {
|
||||
format!("app-{application}-{feature}")
|
||||
}
|
||||
|
||||
@@ -90,13 +90,37 @@ pub async fn run<T: Topology + Send + Sync + 'static>(
|
||||
topology: T,
|
||||
scores: Vec<Box<dyn Score<T>>>,
|
||||
args_struct: Option<Args>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let args = match args_struct {
|
||||
Some(args) => args,
|
||||
None => Args::parse(),
|
||||
};
|
||||
|
||||
#[cfg(not(feature = "tui"))]
|
||||
if args.interactive {
|
||||
return Err("Not compiled with interactive support".into());
|
||||
}
|
||||
|
||||
#[cfg(feature = "tui")]
|
||||
if args.interactive {
|
||||
return harmony_tui::run(inventory, topology, scores).await;
|
||||
}
|
||||
|
||||
run_cli(inventory, topology, scores, args).await
|
||||
}
|
||||
|
||||
pub async fn run_cli<T: Topology + Send + Sync + 'static>(
|
||||
inventory: Inventory,
|
||||
topology: T,
|
||||
scores: Vec<Box<dyn Score<T>>>,
|
||||
args: Args,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let cli_logger_handle = cli_logger::init();
|
||||
|
||||
let mut maestro = Maestro::initialize(inventory, topology).await.unwrap();
|
||||
maestro.register_all(scores);
|
||||
|
||||
let result = init(maestro, args_struct).await;
|
||||
let result = init(maestro, args).await;
|
||||
|
||||
instrumentation::instrument(instrumentation::HarmonyEvent::HarmonyFinished).unwrap();
|
||||
let _ = tokio::try_join!(cli_logger_handle);
|
||||
@@ -105,23 +129,8 @@ pub async fn run<T: Topology + Send + Sync + 'static>(
|
||||
|
||||
async fn init<T: Topology + Send + Sync + 'static>(
|
||||
maestro: harmony::maestro::Maestro<T>,
|
||||
args_struct: Option<Args>,
|
||||
args: Args,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let args = match args_struct {
|
||||
Some(args) => args,
|
||||
None => Args::parse(),
|
||||
};
|
||||
|
||||
#[cfg(feature = "tui")]
|
||||
if args.interactive {
|
||||
return harmony_tui::init(maestro).await;
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "tui"))]
|
||||
if args.interactive {
|
||||
return Err("Not compiled with interactive support".into());
|
||||
}
|
||||
|
||||
let _ = env_logger::builder().try_init();
|
||||
|
||||
let scores_vec = maestro_scores_filter(&maestro, args.all, args.filter, args.number);
|
||||
@@ -193,14 +202,14 @@ mod tests {
|
||||
let maestro = init_test_maestro();
|
||||
let res = crate::init(
|
||||
maestro,
|
||||
Some(crate::Args {
|
||||
crate::Args {
|
||||
yes: true,
|
||||
filter: Some("SuccessScore".to_owned()),
|
||||
interactive: false,
|
||||
all: true,
|
||||
number: 0,
|
||||
list: false,
|
||||
}),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -213,14 +222,14 @@ mod tests {
|
||||
|
||||
let res = crate::init(
|
||||
maestro,
|
||||
Some(crate::Args {
|
||||
crate::Args {
|
||||
yes: true,
|
||||
filter: Some("ErrorScore".to_owned()),
|
||||
interactive: false,
|
||||
all: true,
|
||||
number: 0,
|
||||
list: false,
|
||||
}),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
@@ -233,14 +242,14 @@ mod tests {
|
||||
|
||||
let res = crate::init(
|
||||
maestro,
|
||||
Some(crate::Args {
|
||||
crate::Args {
|
||||
yes: true,
|
||||
filter: None,
|
||||
interactive: false,
|
||||
all: false,
|
||||
number: 0,
|
||||
list: false,
|
||||
}),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
|
||||
@@ -9,7 +9,13 @@ use widget::{help::HelpWidget, score::ScoreListWidget};
|
||||
use std::{panic, sync::Arc, time::Duration};
|
||||
|
||||
use crossterm::event::{Event, EventStream, KeyCode, KeyEventKind};
|
||||
use harmony::{maestro::Maestro, score::Score, topology::Topology};
|
||||
use harmony::{
|
||||
instrumentation::{self, HarmonyEvent},
|
||||
inventory::Inventory,
|
||||
maestro::Maestro,
|
||||
score::Score,
|
||||
topology::Topology,
|
||||
};
|
||||
use ratatui::{
|
||||
self, Frame,
|
||||
layout::{Constraint, Layout, Position},
|
||||
@@ -39,22 +45,62 @@ pub mod tui {
|
||||
///
|
||||
/// #[tokio::main]
|
||||
/// async fn main() {
|
||||
/// let inventory = Inventory::autoload();
|
||||
/// let topology = HAClusterTopology::autoload();
|
||||
/// let mut maestro = Maestro::new_without_initialization(inventory, topology);
|
||||
///
|
||||
/// maestro.register_all(vec![
|
||||
/// Box::new(SuccessScore {}),
|
||||
/// Box::new(ErrorScore {}),
|
||||
/// Box::new(PanicScore {}),
|
||||
/// ]);
|
||||
/// harmony_tui::init(maestro).await.unwrap();
|
||||
/// harmony_tui::run(
|
||||
/// Inventory::autoload(),
|
||||
/// HAClusterTopology::autoload(),
|
||||
/// vec![
|
||||
/// Box::new(SuccessScore {}),
|
||||
/// Box::new(ErrorScore {}),
|
||||
/// Box::new(PanicScore {}),
|
||||
/// ]
|
||||
/// ).await.unwrap();
|
||||
/// }
|
||||
/// ```
|
||||
pub async fn init<T: Topology + Send + Sync + 'static>(
|
||||
pub async fn run<T: Topology + Send + Sync + 'static>(
|
||||
inventory: Inventory,
|
||||
topology: T,
|
||||
scores: Vec<Box<dyn Score<T>>>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let handle = init_instrumentation().await;
|
||||
|
||||
let mut maestro = Maestro::initialize(inventory, topology).await.unwrap();
|
||||
maestro.register_all(scores);
|
||||
|
||||
let result = init(maestro).await;
|
||||
|
||||
let _ = tokio::try_join!(handle);
|
||||
result
|
||||
}
|
||||
|
||||
async fn init<T: Topology + Send + Sync + 'static>(
|
||||
maestro: Maestro<T>,
|
||||
) -> Result<(), Box<dyn std::error::Error>> {
|
||||
HarmonyTUI::new(maestro).init().await
|
||||
let result = HarmonyTUI::new(maestro).init().await;
|
||||
|
||||
instrumentation::instrument(HarmonyEvent::HarmonyFinished).unwrap();
|
||||
result
|
||||
}
|
||||
|
||||
async fn init_instrumentation() -> tokio::task::JoinHandle<()> {
|
||||
let handle = tokio::spawn(handle_harmony_events());
|
||||
|
||||
loop {
|
||||
if instrumentation::instrument(HarmonyEvent::HarmonyStarted).is_ok() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
handle
|
||||
}
|
||||
|
||||
async fn handle_harmony_events() {
|
||||
instrumentation::subscribe("Harmony TUI Logger", async |event| {
|
||||
if let HarmonyEvent::HarmonyFinished = event {
|
||||
return false;
|
||||
};
|
||||
true
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
pub struct HarmonyTUI<T: Topology> {
|
||||
|
||||
Reference in New Issue
Block a user