commit a351fd12289b3fe356ed295d3d6492b972f3d9d2 Author: Ian Letourneau Date: Fri Sep 12 20:43:50 2025 -0400 Structure the Harmony core to rely on a DAG for declaring & executing Scores diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bc9f573 --- /dev/null +++ b/.gitignore @@ -0,0 +1,19 @@ +### Rust ### +# Generated by Cargo +# will have compiled files and executables +debug/ +target/ + +# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries +# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html +Cargo.lock + +# These are backup files generated by rustfmt +**/*.rs.bk + +# MSVC Windows builds of rustc generate these, which store debugging information +*.pdb + +### rust-analyzer ### +# Can be generated by other build systems other than cargo (ex: bazelbuild/rust_rules) +rust-project.json diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..bdc33e4 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,3 @@ +[workspace] +resolver = "3" +members = [ "examples/harmony","harmony-core", "harmony-derive"] diff --git a/examples/harmony/Cargo.toml b/examples/harmony/Cargo.toml new file mode 100644 index 0000000..4409ce0 --- /dev/null +++ b/examples/harmony/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "example-harmony" +version = "0.1.0" +edition = "2024" + +[dependencies] +async-trait = "0.1.89" +harmony-core = { version = "0.1.0", path = "../../harmony-core" } +harmony-derive = { version = "0.1.0", path = "../../harmony-derive" } +inquire = "0.7.5" +serde = { version = "1.0.219", features = ["derive"] } +serde_json = "1.0.143" +tokio = "1.47.1" diff --git a/examples/harmony/src/main.rs b/examples/harmony/src/main.rs new file mode 100644 index 0000000..468132e --- /dev/null +++ b/examples/harmony/src/main.rs @@ -0,0 +1,801 @@ +use std::fmt::Display; + +use async_trait::async_trait; +use harmony_core::{ + Dependency, ExecutionContext, Id, Interpret, InterpretError, InterpretOutcome, Inventory, + LinkedValue, Maestro, Score, Topology, dependency, +}; +use serde::{Deserialize, Serialize}; + +trait Ingress { + fn get_domain(&self, service: &str) -> String; +} + +trait DhcpServer { + fn get_gateway_ip(&self) -> String; +} + +trait LoadBalancer { + fn get_domain_name(&self) -> String; + fn get_load_balancer_ip(&self) -> String; +} + +#[derive(Debug, Default)] +pub struct HaClusterTopology; + +impl Topology for HaClusterTopology { + fn name(&self) -> &str { + "HaCluster" + } +} + +impl Ingress for HaClusterTopology { + fn get_domain(&self, service: &str) -> String { + format!("https://{service}.domain.com") + } +} + +impl DhcpServer for HaClusterTopology { + fn get_gateway_ip(&self) -> String { + "1.1.1.1".into() + } +} + +impl LoadBalancer for HaClusterTopology { + fn get_domain_name(&self) -> String { + "domain.com".into() + } + + fn get_load_balancer_ip(&self) -> String { + "192.168.1.1".into() + } +} + +#[derive(Clone, Debug)] +pub struct IngressScore { + service: String, +} + +impl Display for IngressScore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&format!("IngressScore[{}]", self.service)) + } +} + +impl Score for IngressScore { + fn id(&self) -> Id { + Id(format!("ingress-{}", self.service)) + } + + fn name(&self) -> String { + format!("IngressScore[{}]", self.service) + } + + fn depends_on(&self) -> Vec> { + vec![] + } + + fn create_interpret(&self) -> Box> { + Box::new(IngressInterpret { + score: self.clone(), + }) + } +} + +#[derive(Debug)] +pub struct IngressInterpret { + score: IngressScore, +} + +#[async_trait] +impl Interpret for IngressInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + context: ExecutionContext, + ) -> Result { + let domain = topology.get_domain(&self.score.service); + context.insert(Id("domain".into()), &domain); + + println!("Ingress domain: {domain}"); + + Ok(InterpretOutcome {}) + } +} + +#[derive(Clone, Debug)] +struct ApplicationScore { + name: String, + domain: LinkedValue, +} + +impl Display for ApplicationScore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&format!("ApplicationScore[{}]", self.name)) + } +} + +impl ApplicationScore { + fn new(name: String) -> Self { + Self { + name: name.clone(), + domain: LinkedValue::ContextKey(Id("ingress.domain".into())), + } + } +} + +impl Score for ApplicationScore { + fn id(&self) -> Id { + Id(format!("application-{}", self.name)) + } + + fn name(&self) -> String { + format!("ApplicationScore[{}]", self.name) + } + + fn depends_on(&self) -> Vec> { + vec![dependency!( + Id("ingress".into()), + IngressScore { + service: self.name.clone() + } + )] + } + + fn create_interpret(&self) -> Box> { + Box::new(ApplicationInterpret { + score: self.clone(), + }) + } +} + +#[derive(Debug)] +struct ApplicationInterpret { + score: ApplicationScore, +} + +#[async_trait] +impl Interpret for ApplicationInterpret { + async fn execute( + &self, + _inventory: &Inventory, + _topology: &T, + context: ExecutionContext, + ) -> Result { + let domain = self.score.domain.resolve(&context)?; + + println!("{}: {}", self.score.name, domain); + + Ok(InterpretOutcome {}) + } +} + +#[derive(Clone, Debug)] +struct DhcpServerScore {} + +impl Display for DhcpServerScore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("DhcpServerScore") + } +} + +impl Score for DhcpServerScore { + fn id(&self) -> Id { + Id("dhcp-server".into()) + } + + fn name(&self) -> String { + "DhcpServerScore".into() + } + + fn depends_on(&self) -> Vec> { + vec![] + } + + fn create_interpret(&self) -> Box> { + Box::new(DhcpServerInterpret {}) + } +} + +#[derive(Debug)] +struct DhcpServerInterpret {} + +#[async_trait] +impl Interpret for DhcpServerInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + context: ExecutionContext, + ) -> Result { + let gateway_ip = topology.get_gateway_ip(); + context.insert(Id("gateway-ip".into()), gateway_ip); + + Ok(InterpretOutcome {}) + } +} + +#[derive(Clone, Debug)] +struct DhcpScore { + host_binding: Vec, + domain: Option, + next_server: LinkedValue, +} + +impl DhcpScore { + fn new(host_binding: Vec, domain: Option, next_server: Option) -> Self { + Self { + host_binding, + domain, + next_server: match next_server { + Some(next_server) => LinkedValue::Value(next_server), + None => LinkedValue::ContextKey(Id("dhcp-server.gateway-ip".into())), + }, + } + } +} + +impl Display for DhcpScore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("DhcpScore") + } +} + +impl Score for DhcpScore { + fn id(&self) -> Id { + Id("dhcp".into()) + } + + fn name(&self) -> String { + "DhcpScore".into() + } + + fn depends_on(&self) -> Vec> { + let mut dependencies: Vec> = vec![]; + + if let LinkedValue::ContextKey(_) = &self.next_server { + dependencies.push(dependency!(Id("dhcp-server".into()), DhcpServerScore {})); + } + + dependencies + } + + fn create_interpret(&self) -> Box> { + Box::new(DhcpInterpret { + score: self.clone(), + }) + } +} + +#[derive(Debug)] +struct DhcpInterpret { + score: DhcpScore, +} + +#[async_trait] +impl Interpret for DhcpInterpret { + async fn execute( + &self, + _inventory: &Inventory, + _topology: &T, + context: ExecutionContext, + ) -> Result { + let next_server = self.score.next_server.resolve(&context)?; + println!( + "Configuring DHCP [\n - next_server: {:?}\n - host_binding: {:?}\n - domain: {:?}\n]", + next_server, self.score.host_binding, self.score.domain + ); + Ok(InterpretOutcome {}) + } +} + +#[derive(Clone, Debug)] +struct OkdIpxeScore { + kickstart_filename: String, + harmony_inventory_agent: String, +} + +impl Display for OkdIpxeScore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&format!("OkdIpxeScore[{}]", self.kickstart_filename)) + } +} + +impl Score for OkdIpxeScore { + fn id(&self) -> Id { + Id("okd-ipxe".into()) + } + + fn name(&self) -> String { + format!("OkdIpxeScore[{}]", self.kickstart_filename) + } + + fn depends_on(&self) -> Vec> { + vec![ + dependency!(Id("dhcp".into()), DhcpScore::new(vec![], None, None)), + dependency!( + Id("tftp".into()), + TftpScore { + files_to_serve: "./data/pxe/okd/tftpboot/".into(), + } + ), + ] + } + + fn create_interpret(&self) -> Box> { + Box::new(OkdIpxeInterpret {}) + } +} + +#[derive(Debug)] +struct OkdIpxeInterpret {} + +#[async_trait] +impl Interpret for OkdIpxeInterpret { + async fn execute( + &self, + _inventory: &Inventory, + _topology: &T, + _context: ExecutionContext, + ) -> Result { + Ok(InterpretOutcome {}) + } +} + +#[derive(Clone, Debug)] +struct TftpScore { + files_to_serve: String, +} + +impl Display for TftpScore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&format!("TftpScore[{}]", self.files_to_serve)) + } +} + +impl Score for TftpScore { + fn id(&self) -> Id { + Id("TftpScore".into()) + } + + fn name(&self) -> String { + "TftpScore".into() + } + + fn depends_on(&self) -> Vec> { + vec![] + } + + fn create_interpret(&self) -> Box> { + Box::new(TftpInterpret {}) + } +} + +#[derive(Debug)] +struct TftpInterpret {} + +#[async_trait] +impl Interpret for TftpInterpret { + async fn execute( + &self, + _inventory: &Inventory, + _topology: &T, + _context: ExecutionContext, + ) -> Result { + Ok(InterpretOutcome {}) + } +} + +#[derive(Clone, Debug)] +struct OkdInstallationScore {} + +impl Display for OkdInstallationScore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("OkdInstallationScore") + } +} + +impl Score for OkdInstallationScore { + fn id(&self) -> Id { + Id("okd-installation".into()) + } + + fn name(&self) -> String { + "OkdInstallationScore".into() + } + + fn depends_on(&self) -> Vec> { + vec![ + dependency!(Id("prepare".into()), OkdInstallationPrepareScore::new()), + dependency!(Id("bootstrap".into()), OkdInstallationBootstrapScore {}), + ] + } + + fn create_interpret(&self) -> Box> { + Box::new(OkdInstallationInterpret {}) + } +} + +#[derive(Debug)] +struct OkdInstallationInterpret {} + +#[async_trait] +impl Interpret for OkdInstallationInterpret { + async fn execute( + &self, + _inventory: &Inventory, + _topology: &T, + _context: ExecutionContext, + ) -> Result { + Ok(InterpretOutcome {}) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +struct PhysicalHost { + id: Id, + category: HostCategory, +} + +impl Display for PhysicalHost { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&format!("Host[{}, {:?}]", self.id, self.category)) + } +} + +#[derive(Clone, Debug)] +enum HostRole { + Bootstrap, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +enum HostCategory { + Server, + Firewall, + Switch, +} + +#[derive(Clone, Debug)] +struct OkdInstallationPrepareScore { + bootstrap_host: LinkedValue, +} + +impl OkdInstallationPrepareScore { + fn new() -> Self { + Self { + bootstrap_host: LinkedValue::ContextKey(Id("discover-host.host".into())), + } + } +} + +impl Display for OkdInstallationPrepareScore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("OkdInstallationPrepareScore") + } +} + +impl Score for OkdInstallationPrepareScore { + fn id(&self) -> Id { + Id("okd-installation-prepare".into()) + } + + fn name(&self) -> String { + "OkdInstallationPrepareScore".into() + } + + fn depends_on(&self) -> Vec> { + vec![ + dependency!(Id("opnsense-hosts".into()), OPNsenseHostsScore {}), + dependency!( + Id("discover-host".into()), + DiscoverHostForRoleScore::new(HostRole::Bootstrap) + ), + ] + } + + fn create_interpret(&self) -> Box> { + Box::new(OkdInstallationPrepareInterpret { + score: self.clone(), + }) + } +} + +#[derive(Debug)] +struct OkdInstallationPrepareInterpret { + score: OkdInstallationPrepareScore, +} + +#[async_trait] +impl Interpret for OkdInstallationPrepareInterpret { + async fn execute( + &self, + _inventory: &Inventory, + _topology: &T, + context: ExecutionContext, + ) -> Result { + let bootstrap_host = self.score.bootstrap_host.resolve(&context)?; + println!("Bootstrap host: {}", bootstrap_host); + Ok(InterpretOutcome {}) + } +} + +#[derive(Clone, Debug)] +struct DiscoverHostForRoleScore { + role: HostRole, + hosts: LinkedValue>, +} + +impl DiscoverHostForRoleScore { + fn new(role: HostRole) -> Self { + Self { + role, + hosts: LinkedValue::ContextKey(Id("available-hosts.hosts".into())), + } + } +} + +impl Display for DiscoverHostForRoleScore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&format!("DiscoverHostForRoleScore[{:?}]", self.role)) + } +} + +impl Score for DiscoverHostForRoleScore { + fn id(&self) -> Id { + Id(format!("discover-host-{:?}", self.role)) + } + + fn name(&self) -> String { + format!("DiscoverHostForRoleScore[{:?}]", self.role) + } + + fn depends_on(&self) -> Vec> { + vec![dependency!( + Id("available-hosts".into()), + DiscoverAvailableHostsScore {} + )] + } + + fn create_interpret(&self) -> Box> { + Box::new(DiscoverHostForRoleInterpret { + score: self.clone(), + }) + } +} + +#[derive(Debug)] +struct DiscoverHostForRoleInterpret { + score: DiscoverHostForRoleScore, +} + +#[async_trait] +impl Interpret for DiscoverHostForRoleInterpret { + async fn execute( + &self, + _inventory: &Inventory, + _topology: &T, + context: ExecutionContext, + ) -> Result { + let hosts = self.score.hosts.resolve(&context)?; + let host = hosts.first().unwrap(); + context.insert(Id("host".into()), host); + Ok(InterpretOutcome {}) + } +} + +#[derive(Clone, Debug)] +struct DiscoverAvailableHostsScore {} + +impl Display for DiscoverAvailableHostsScore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("DiscoverAvailableHostsScore") + } +} + +impl Score for DiscoverAvailableHostsScore { + fn id(&self) -> Id { + Id("discover-available-hosts".into()) + } + + fn name(&self) -> String { + "DiscoverAvailableHostsScore".into() + } + + fn depends_on(&self) -> Vec> { + vec![] + } + + fn create_interpret(&self) -> Box> { + Box::new(DiscoverAvailableHostsInterpret {}) + } +} + +#[derive(Debug)] +struct DiscoverAvailableHostsInterpret {} + +#[async_trait] +impl Interpret for DiscoverAvailableHostsInterpret { + async fn execute( + &self, + _inventory: &Inventory, + _topology: &T, + context: ExecutionContext, + ) -> Result { + context.insert( + Id("hosts".into()), + vec![PhysicalHost { + id: Id("host-1".into()), + category: HostCategory::Server, + }], + ); + Ok(InterpretOutcome {}) + } +} + +#[derive(Clone, Debug)] +struct OPNsenseHostsScore {} + +impl Display for OPNsenseHostsScore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("OPNsenseHostsScore") + } +} + +impl Score for OPNsenseHostsScore { + fn id(&self) -> Id { + Id("opnsense-hosts".into()) + } + + fn name(&self) -> String { + "OPNsenseHostsScore".into() + } + + fn depends_on(&self) -> Vec> { + vec![] + } + + fn create_interpret(&self) -> Box> { + Box::new(OPNsenseHostsInterpret {}) + } +} + +#[derive(Debug)] +struct OPNsenseHostsInterpret {} + +#[async_trait] +impl Interpret for OPNsenseHostsInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + _context: ExecutionContext, + ) -> Result { + let cluster_domain = &topology.get_domain_name(); + let load_balancer_ip = &topology.get_load_balancer_ip(); + + inquire::Confirm::new(&format!( + "Set hostnames manually in your opnsense dnsmasq config : +*.apps.{cluster_domain} -> {load_balancer_ip} +api.{cluster_domain} -> {load_balancer_ip} +api-int.{cluster_domain} -> {load_balancer_ip} + +When you can dig them, confirm to continue. +" + )) + .prompt() + .expect("Prompt error"); + + Ok(InterpretOutcome {}) + } +} + +#[derive(Clone, Debug)] +struct OkdInstallationBootstrapScore {} + +impl Display for OkdInstallationBootstrapScore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("OkdInstallationBootstrapScore") + } +} + +impl Score for OkdInstallationBootstrapScore { + fn id(&self) -> Id { + Id("okd-installation-bootstrap".into()) + } + + fn name(&self) -> String { + "OkdInstallationBootstrapScore".into() + } + + fn depends_on(&self) -> Vec> { + vec![] + } + + fn create_interpret(&self) -> Box> { + Box::new(OkdInstallationBootstrapInterpret {}) + } +} + +#[derive(Debug)] +struct OkdInstallationBootstrapInterpret {} + +#[async_trait] +impl Interpret for OkdInstallationBootstrapInterpret { + async fn execute( + &self, + _inventory: &Inventory, + _topology: &T, + _context: ExecutionContext, + ) -> Result { + Ok(InterpretOutcome {}) + } +} + +#[derive(Clone, Debug)] +struct ProvisionHighAvailabilityClusterScore {} + +impl Display for ProvisionHighAvailabilityClusterScore { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("ProvisionHighAvailabilityClusterScore") + } +} + +impl Score for ProvisionHighAvailabilityClusterScore { + fn id(&self) -> Id { + Id("provision-high-availability-cluster".into()) + } + + fn name(&self) -> String { + "ProvisionHighAvailabilityClusterScore".into() + } + + fn depends_on(&self) -> Vec> { + vec![ + dependency!( + Id("okd-ipxe".into()), + OkdIpxeScore { + kickstart_filename: "inventory.kickstart".into(), + harmony_inventory_agent: "harmony_inventory_agent".into(), + } + ), + dependency!(Id("okd-installation".into()), OkdInstallationScore {}), + ] + } + + fn create_interpret(&self) -> Box> { + Box::new(ProvisionHighAvailabilityClusterInterpret {}) + } +} + +#[derive(Debug)] +struct ProvisionHighAvailabilityClusterInterpret {} + +#[async_trait] +impl Interpret for ProvisionHighAvailabilityClusterInterpret { + async fn execute( + &self, + _inventory: &Inventory, + _topology: &T, + _context: ExecutionContext, + ) -> Result { + Ok(InterpretOutcome {}) + } +} + +#[tokio::main] +async fn main() -> Result<(), InterpretError> { + let inventory = Inventory::autoload(); + let topology = HaClusterTopology; + + let maestro = Maestro::init( + inventory, + topology, + vec![Box::new(ProvisionHighAvailabilityClusterScore {})], + ); + + let _ = maestro.plan(); + maestro.execute().await?; + + Ok(()) +} diff --git a/harmony-core/Cargo.toml b/harmony-core/Cargo.toml new file mode 100644 index 0000000..c73c2b5 --- /dev/null +++ b/harmony-core/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "harmony-core" +version = "0.1.0" +edition = "2024" + +[dependencies] +async-trait = "0.1.89" +petgraph = "0.8.2" +serde = { version = "1.0.219", features = ["derive"] } +serde_json = "1.0.143" +tokio = { version = "1.47.1", features = ["macros", "rt-multi-thread"] } diff --git a/harmony-core/src/lib.rs b/harmony-core/src/lib.rs new file mode 100644 index 0000000..de69227 --- /dev/null +++ b/harmony-core/src/lib.rs @@ -0,0 +1,319 @@ +use async_trait::async_trait; +use petgraph::{ + algo, + dot::Dot, + graph::{DiGraph, NodeIndex}, +}; +use serde::{Deserialize, Serialize, de::DeserializeOwned}; +use std::{ + collections::HashMap, + fmt::{Debug, Display}, + sync::{Arc, Mutex}, +}; + +#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)] +pub struct Id(pub String); + +impl Default for Id { + fn default() -> Self { + Id("default_id".into()) + } +} + +impl Display for Id { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.0.as_str()) + } +} + +#[derive(Clone, Debug)] +pub enum LinkedValue { + Value(T), + ContextKey(Id), +} + +impl LinkedValue +where + T: Clone + Serialize + DeserializeOwned, +{ + pub fn resolve(&self, context: &ExecutionContext) -> Result { + match self { + LinkedValue::Value(v) => Ok(v.clone()), + LinkedValue::ContextKey(key_id) => { + context.get::(key_id).ok_or_else(|| InterpretError { + msg: format!("Value for key '{}' not found in context.", key_id), + }) + } + } + } +} + +pub struct Inventory; + +impl Inventory { + pub fn autoload() -> Self { + Inventory + } +} + +pub trait Topology: Send + Sync { + fn name(&self) -> &str; +} + +#[derive(Debug, Clone, Copy)] +pub struct InterpretOutcome {} + +#[derive(Debug, Clone)] +pub struct InterpretError { + pub msg: String, +} + +pub struct Dependency(pub Id, pub Box>); + +impl Clone for Dependency { + fn clone(&self) -> Self { + Self(self.0.clone(), self.1.clone_box()) + } +} + +impl Display for Dependency { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&format!("{}", self.1)) + } +} + +#[macro_export] +macro_rules! dependency { + ($id:expr, $score:expr) => { + Dependency($id, Box::new($score)) + }; +} + +#[async_trait] +pub trait Score: Display + Send + Sync + CloneBoxScore { + fn id(&self) -> Id; + fn name(&self) -> String; + fn depends_on(&self) -> Vec>; + fn create_interpret(&self) -> Box>; +} + +pub trait CloneBoxScore { + fn clone_box(&self) -> Box>; +} + +impl CloneBoxScore for S +where + T: Topology, + S: Score + Clone + 'static, +{ + fn clone_box(&self) -> Box> { + Box::new(self.clone()) + } +} + +#[async_trait] +pub trait Interpret: Debug + Send { + async fn execute( + &self, + inventory: &Inventory, + topology: &T, + context: ExecutionContext, + ) -> Result; +} + +#[derive(Default)] +pub struct Context { + data: std::collections::HashMap, +} + +impl Context { + pub fn get(&self, id: &Id) -> Option { + self.data + .get(id) + .and_then(|value| serde_json::from_value(value.clone()).ok()) + } + + pub fn insert(&mut self, id: Id, data: T) { + if let Ok(value) = serde_json::to_value(data) { + self.data.insert(id, value); + } + } +} + +pub struct ExecutionContext { + id: Id, + context: Arc>, +} + +impl ExecutionContext { + fn wrap(context: Arc>, dependency: Dependency) -> Self { + Self { + id: dependency.0, + context, + } + } + + pub fn get(&self, id: &Id) -> Option { + let key = self.map_key(id); + + let context = self.context.lock().unwrap(); + context + .get(&key) + .and_then(|value: serde_json::Value| serde_json::from_value(value.clone()).ok()) + } + + pub fn insert(&self, id: Id, data: T) { + let key = self.map_key(&id); + + let mut context = self.context.lock().unwrap(); + context.insert(key, data); + } + + fn map_key(&self, id: &Id) -> Id { + Id(format!("{}.{}", self.id.0, id.0)) + } +} + +pub struct Maestro { + inventory: Inventory, + topology: T, + scores: DiGraph, &'static str>, +} + +impl Maestro { + pub fn init(inventory: Inventory, topology: T, scores: Vec>>) -> Self { + let scores = build_dag(scores).expect("Invalid scores DAG"); + + Self { + inventory, + topology, + scores, + } + } + + pub fn plan(&self) -> Result<(), String> { + let dot = Dot::new(&self.scores); + println!("{}", dot); + Ok(()) + } + + pub async fn execute(&self) -> Result<(), InterpretError> { + println!("Executing scores based on dependencies:"); + let context = Arc::new(Mutex::new(Context::default())); + + // Use petgraph's topological sort to get a guaranteed order. + let mut sorted_nodes = match algo::toposort(&self.scores, None) { + Ok(nodes) => nodes, + Err(e) => { + let cycle_node_id = &self.scores.node_weight(e.node_id()).unwrap().0; + return Err(InterpretError { + msg: format!( + "A cycle was detected in the graph, involving score ID: {}", + cycle_node_id + ), + }); + } + }; + sorted_nodes.reverse(); + + for node_index in sorted_nodes { + let dependency = self.scores.node_weight(node_index).ok_or(InterpretError { + msg: "Failed to get score from graph node.".into(), + })?; + let execution_context = ExecutionContext::wrap(context.clone(), dependency.clone()); + + println!(" - Executing {}", dependency.0); + + let interpret = dependency.1.create_interpret(); + interpret + .execute(&self.inventory, &self.topology, execution_context) + .await?; + } + + Ok(()) + } +} + +/// Builds a Directed Acyclic Graph (DAG) from a list of scores. +/// +/// This function uses the `petgraph` library to perform cycle validation. +/// It correctly handles scores that are only present as dependencies, +/// ensuring all nodes in the graph are discovered and validated. +/// +/// # Arguments +/// * `scores` - A vector of `Box` objects. These are the starting +/// nodes for the DAG. The function will recursively find all +/// transitive dependencies. +/// +/// # Returns +/// * `Ok(DiGraph>, ()>)` - The final DAG, represented as a +/// `petgraph::DiGraph`, if no cycles were found. The node weights are the `Box` objects. +/// * `Err(String)` - An error message if a cycle or duplicate ID was detected. +fn build_dag( + scores: Vec>>, +) -> Result, &'static str>, String> { + // A map to link each score's ID to its petgraph NodeIndex. + let mut id_to_node_index: HashMap = HashMap::new(); + + // The directed graph. The node weights are the score objects. + let mut graph: DiGraph, &str> = DiGraph::new(); + + // A queue for processing new nodes and their dependencies. + let mut processing_queue: Vec = Vec::new(); + + // First, add all initial scores to the graph and the queue. + for score in scores { + let id = score.id(); + let dependency = Dependency(id.clone(), score.clone_box()); + + // Check for duplicate root scores. + if id_to_node_index.contains_key(&id) { + return Err(format!("Duplicate score ID found: {}", id)); + } + + let node_index = graph.add_node(dependency); + id_to_node_index.insert(id, node_index); + processing_queue.push(node_index); + } + + // Now, process the queue to build the full graph. + while let Some(current_node_index) = processing_queue.pop() { + let current_node = graph.node_weight(current_node_index).unwrap(); + let current_id = ¤t_node.0.clone(); + + let dependencies = current_node.1.depends_on(); + + for dependency in dependencies { + let dep_id = Id(format!("{}.{}", current_id, &dependency.0)); + let dependency = Dependency(dep_id.clone(), dependency.1.clone_box()); + + let dep_node_index = match id_to_node_index.get(&dep_id) { + Some(index) => *index, + None => { + // This is a new dependency, add it to the graph. + let new_node_index = graph.add_node(dependency.clone()); + id_to_node_index.insert(dep_id.clone(), new_node_index); + processing_queue.push(new_node_index); + new_node_index + } + }; + + // Add an edge from the current score to its dependency. + graph.add_edge(current_node_index, dep_node_index, ""); + } + } + + // Use petgraph's `toposort` to validate the DAG. + match algo::toposort(&graph, None) { + Ok(_) => Ok(graph), + Err(e) => { + let cycle_node_weight = graph + .node_weight(e.node_id()) + .ok_or_else(|| "Cycle detected, but cycle node not found".to_string())?; + Err(format!( + "Cycle detected in dependencies, involving score with ID: {}", + cycle_node_weight.0 + )) + } + } +} diff --git a/harmony-derive/Cargo.toml b/harmony-derive/Cargo.toml new file mode 100644 index 0000000..64b4748 --- /dev/null +++ b/harmony-derive/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "harmony-derive" +version = "0.1.0" +edition = "2024" + +[lib] +proc-macro = true + +[dependencies] +quote = "1.0.40" +syn = "2.0.106" diff --git a/harmony-derive/src/lib.rs b/harmony-derive/src/lib.rs new file mode 100644 index 0000000..1af3c64 --- /dev/null +++ b/harmony-derive/src/lib.rs @@ -0,0 +1,58 @@ +use proc_macro::TokenStream; +use quote::quote; +use syn::{Attribute, Ident, ItemStruct, parse_macro_input}; + +// This was an attempt to get a derive macro to simplify implementing new scores +// +// The goal was to get a syntax similar to this: +// #[derive(Score)] +// #[interpret(MyInterpret)] +// #[dependencies(OtherScore("foo"), OtherScore("bar"), ExtraScore("baz"))] +// struct MyScore { +// foo: LinkedValue, +// bar: LinkedValue, +// baz: LinkedValue>, +// } + +#[proc_macro_derive(Score, attributes(interpret))] +pub fn score_derive(input: TokenStream) -> TokenStream { + let input_struct = parse_macro_input!(input as ItemStruct); + let score_name = &input_struct.ident; + + let interpret_name = get_interpret_name(&input_struct.attrs); + + let expanded = quote! { + #[derive(Debug)] + pub struct #interpret_name { + score: #score_name, + } + + impl crate::Score for #score_name { + fn create_interpret(&self) -> Box> { + Box::new(#interpret_name { + score: self.clone(), + }) + } + } + }; + + TokenStream::from(expanded) +} + +fn get_interpret_name(attrs: &Vec) -> Ident { + for attr in attrs { + if attr.path().is_ident("interpret") { + let nested_meta = attr + .parse_args_with( + syn::punctuated::Punctuated::::parse_terminated, + ) + .unwrap(); + if let Some(ident) = nested_meta.first() { + return ident.clone(); + } + } + } + + // Return a default or panic if the attribute is not found + Ident::new("DefaultInterpret", proc_macro::Span::call_site().into()) +}