Structure the Harmony core to rely on a DAG for declaring & executing Scores

This commit is contained in:
Ian Letourneau
2025-09-12 20:43:50 -04:00
commit a351fd1228
8 changed files with 1235 additions and 0 deletions

11
harmony-core/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "harmony-core"
version = "0.1.0"
edition = "2024"
[dependencies]
async-trait = "0.1.89"
petgraph = "0.8.2"
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.143"
tokio = { version = "1.47.1", features = ["macros", "rt-multi-thread"] }

319
harmony-core/src/lib.rs Normal file
View File

@@ -0,0 +1,319 @@
use async_trait::async_trait;
use petgraph::{
algo,
dot::Dot,
graph::{DiGraph, NodeIndex},
};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use std::{
collections::HashMap,
fmt::{Debug, Display},
sync::{Arc, Mutex},
};
#[derive(Debug, PartialEq, Eq, Clone, Hash, Serialize, Deserialize)]
pub struct Id(pub String);
impl Default for Id {
fn default() -> Self {
Id("default_id".into())
}
}
impl Display for Id {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.0.as_str())
}
}
#[derive(Clone, Debug)]
pub enum LinkedValue<T> {
Value(T),
ContextKey(Id),
}
impl<T> LinkedValue<T>
where
T: Clone + Serialize + DeserializeOwned,
{
pub fn resolve(&self, context: &ExecutionContext) -> Result<T, InterpretError> {
match self {
LinkedValue::Value(v) => Ok(v.clone()),
LinkedValue::ContextKey(key_id) => {
context.get::<T>(key_id).ok_or_else(|| InterpretError {
msg: format!("Value for key '{}' not found in context.", key_id),
})
}
}
}
}
pub struct Inventory;
impl Inventory {
pub fn autoload() -> Self {
Inventory
}
}
pub trait Topology: Send + Sync {
fn name(&self) -> &str;
}
#[derive(Debug, Clone, Copy)]
pub struct InterpretOutcome {}
#[derive(Debug, Clone)]
pub struct InterpretError {
pub msg: String,
}
pub struct Dependency<T>(pub Id, pub Box<dyn Score<T>>);
impl<T: Topology> Clone for Dependency<T> {
fn clone(&self) -> Self {
Self(self.0.clone(), self.1.clone_box())
}
}
impl<T: Topology> Display for Dependency<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&format!("{}", self.1))
}
}
#[macro_export]
macro_rules! dependency {
($id:expr, $score:expr) => {
Dependency($id, Box::new($score))
};
}
#[async_trait]
pub trait Score<T: Topology>: Display + Send + Sync + CloneBoxScore<T> {
fn id(&self) -> Id;
fn name(&self) -> String;
fn depends_on(&self) -> Vec<Dependency<T>>;
fn create_interpret(&self) -> Box<dyn Interpret<T>>;
}
pub trait CloneBoxScore<T: Topology> {
fn clone_box(&self) -> Box<dyn Score<T>>;
}
impl<S, T> CloneBoxScore<T> for S
where
T: Topology,
S: Score<T> + Clone + 'static,
{
fn clone_box(&self) -> Box<dyn Score<T>> {
Box::new(self.clone())
}
}
#[async_trait]
pub trait Interpret<T>: Debug + Send {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
context: ExecutionContext,
) -> Result<InterpretOutcome, InterpretError>;
}
#[derive(Default)]
pub struct Context {
data: std::collections::HashMap<Id, serde_json::Value>,
}
impl Context {
pub fn get<T: DeserializeOwned>(&self, id: &Id) -> Option<T> {
self.data
.get(id)
.and_then(|value| serde_json::from_value(value.clone()).ok())
}
pub fn insert<T: Serialize>(&mut self, id: Id, data: T) {
if let Ok(value) = serde_json::to_value(data) {
self.data.insert(id, value);
}
}
}
pub struct ExecutionContext {
id: Id,
context: Arc<Mutex<Context>>,
}
impl ExecutionContext {
fn wrap<T: Topology>(context: Arc<Mutex<Context>>, dependency: Dependency<T>) -> Self {
Self {
id: dependency.0,
context,
}
}
pub fn get<T: DeserializeOwned>(&self, id: &Id) -> Option<T> {
let key = self.map_key(id);
let context = self.context.lock().unwrap();
context
.get(&key)
.and_then(|value: serde_json::Value| serde_json::from_value(value.clone()).ok())
}
pub fn insert<T: Serialize>(&self, id: Id, data: T) {
let key = self.map_key(&id);
let mut context = self.context.lock().unwrap();
context.insert(key, data);
}
fn map_key(&self, id: &Id) -> Id {
Id(format!("{}.{}", self.id.0, id.0))
}
}
pub struct Maestro<T: Topology> {
inventory: Inventory,
topology: T,
scores: DiGraph<Dependency<T>, &'static str>,
}
impl<T: Topology> Maestro<T> {
pub fn init(inventory: Inventory, topology: T, scores: Vec<Box<dyn Score<T>>>) -> Self {
let scores = build_dag(scores).expect("Invalid scores DAG");
Self {
inventory,
topology,
scores,
}
}
pub fn plan(&self) -> Result<(), String> {
let dot = Dot::new(&self.scores);
println!("{}", dot);
Ok(())
}
pub async fn execute(&self) -> Result<(), InterpretError> {
println!("Executing scores based on dependencies:");
let context = Arc::new(Mutex::new(Context::default()));
// Use petgraph's topological sort to get a guaranteed order.
let mut sorted_nodes = match algo::toposort(&self.scores, None) {
Ok(nodes) => nodes,
Err(e) => {
let cycle_node_id = &self.scores.node_weight(e.node_id()).unwrap().0;
return Err(InterpretError {
msg: format!(
"A cycle was detected in the graph, involving score ID: {}",
cycle_node_id
),
});
}
};
sorted_nodes.reverse();
for node_index in sorted_nodes {
let dependency = self.scores.node_weight(node_index).ok_or(InterpretError {
msg: "Failed to get score from graph node.".into(),
})?;
let execution_context = ExecutionContext::wrap(context.clone(), dependency.clone());
println!(" - Executing {}", dependency.0);
let interpret = dependency.1.create_interpret();
interpret
.execute(&self.inventory, &self.topology, execution_context)
.await?;
}
Ok(())
}
}
/// Builds a Directed Acyclic Graph (DAG) from a list of scores.
///
/// This function uses the `petgraph` library to perform cycle validation.
/// It correctly handles scores that are only present as dependencies,
/// ensuring all nodes in the graph are discovered and validated.
///
/// # Arguments
/// * `scores` - A vector of `Box<dyn Score>` objects. These are the starting
/// nodes for the DAG. The function will recursively find all
/// transitive dependencies.
///
/// # Returns
/// * `Ok(DiGraph<Box<dyn Score<'a, T>>, ()>)` - The final DAG, represented as a
/// `petgraph::DiGraph`, if no cycles were found. The node weights are the `Box<dyn Score>` objects.
/// * `Err(String)` - An error message if a cycle or duplicate ID was detected.
fn build_dag<T: Topology>(
scores: Vec<Box<dyn Score<T>>>,
) -> Result<DiGraph<Dependency<T>, &'static str>, String> {
// A map to link each score's ID to its petgraph NodeIndex.
let mut id_to_node_index: HashMap<Id, NodeIndex> = HashMap::new();
// The directed graph. The node weights are the score objects.
let mut graph: DiGraph<Dependency<T>, &str> = DiGraph::new();
// A queue for processing new nodes and their dependencies.
let mut processing_queue: Vec<NodeIndex> = Vec::new();
// First, add all initial scores to the graph and the queue.
for score in scores {
let id = score.id();
let dependency = Dependency(id.clone(), score.clone_box());
// Check for duplicate root scores.
if id_to_node_index.contains_key(&id) {
return Err(format!("Duplicate score ID found: {}", id));
}
let node_index = graph.add_node(dependency);
id_to_node_index.insert(id, node_index);
processing_queue.push(node_index);
}
// Now, process the queue to build the full graph.
while let Some(current_node_index) = processing_queue.pop() {
let current_node = graph.node_weight(current_node_index).unwrap();
let current_id = &current_node.0.clone();
let dependencies = current_node.1.depends_on();
for dependency in dependencies {
let dep_id = Id(format!("{}.{}", current_id, &dependency.0));
let dependency = Dependency(dep_id.clone(), dependency.1.clone_box());
let dep_node_index = match id_to_node_index.get(&dep_id) {
Some(index) => *index,
None => {
// This is a new dependency, add it to the graph.
let new_node_index = graph.add_node(dependency.clone());
id_to_node_index.insert(dep_id.clone(), new_node_index);
processing_queue.push(new_node_index);
new_node_index
}
};
// Add an edge from the current score to its dependency.
graph.add_edge(current_node_index, dep_node_index, "");
}
}
// Use petgraph's `toposort` to validate the DAG.
match algo::toposort(&graph, None) {
Ok(_) => Ok(graph),
Err(e) => {
let cycle_node_weight = graph
.node_weight(e.node_id())
.ok_or_else(|| "Cycle detected, but cycle node not found".to_string())?;
Err(format!(
"Cycle detected in dependencies, involving score with ID: {}",
cycle_node_weight.0
))
}
}
}