diff --git a/brocade/Cargo.toml b/brocade/Cargo.toml index 3112918..01dc320 100644 --- a/brocade/Cargo.toml +++ b/brocade/Cargo.toml @@ -12,3 +12,5 @@ russh.workspace = true russh-keys.workspace = true tokio.workspace = true log.workspace = true +env_logger.workspace = true +regex = "1.11.3" diff --git a/brocade/examples/main.rs b/brocade/examples/main.rs new file mode 100644 index 0000000..d69060b --- /dev/null +++ b/brocade/examples/main.rs @@ -0,0 +1,49 @@ +use std::net::{IpAddr, Ipv4Addr}; + +use brocade::BrocadeClient; + +#[tokio::main] +async fn main() { + env_logger::init(); + + let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 250)); + let switch_addresses = vec![ip]; + + let brocade = BrocadeClient::init(&switch_addresses, "admin", "password", None) + .await + .expect("Brocade client failed to connect"); + + println!("Showing MAC Address table..."); + + let mac_adddresses = brocade.show_mac_address_table().await.unwrap(); + println!("VLAN\tMAC\t\t\tPORT"); + for mac in mac_adddresses { + println!("{}\t{}\t{}", mac.vlan, mac.mac_address, mac.port_name); + } + + println!("--------------"); + let channel_name = "HARMONY_LAG"; + println!("Clearing port channel '{channel_name}'..."); + + brocade.clear_port_channel(channel_name).await.unwrap(); + + println!("Cleared"); + + println!("--------------"); + println!("Finding next available channel..."); + + let channel_id = brocade.find_available_channel_id().await.unwrap(); + println!("Channel id: {channel_id}"); + + println!("--------------"); + let channel_name = "HARMONY_LAG"; + let ports = vec!["1/1/3".to_string()]; + println!("Creating port channel '{channel_name}' with ports {ports:?}'..."); + + brocade + .create_port_channel(channel_name, channel_id, &ports) + .await + .unwrap(); + + println!("Created"); +} diff --git a/brocade/src/lib.rs b/brocade/src/lib.rs index 552704e..864dd49 100644 --- a/brocade/src/lib.rs +++ b/brocade/src/lib.rs @@ -1,5 +1,6 @@ use std::{ borrow::Cow, + collections::HashSet, fmt::{self, Display}, sync::Arc, time::Duration, @@ -7,16 +8,15 @@ use std::{ use async_trait::async_trait; use harmony_types::net::{IpAddress, MacAddress}; -use log::{debug, info, trace}; -use russh::{ - ChannelMsg, - client::{Handle, Handler}, - kex::DH_G1_SHA1, -}; +use log::{debug, info}; +use regex::Regex; +use russh::{ChannelMsg, client::Handler, kex::DH_G1_SHA1}; use russh_keys::key::{self, SSH_RSA}; use std::str::FromStr; use tokio::time::{Instant, timeout}; +static PORT_CHANNEL_NAME: &str = "HARMONY_LAG"; + #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] pub struct MacAddressEntry { pub vlan: u16, @@ -24,9 +24,21 @@ pub struct MacAddressEntry { pub port_name: String, } +impl Display for MacAddressEntry { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str( + format!( + "VLAN\tMAC-Address\t\tPort\n{}\t{}\t{}", + self.vlan, self.mac_address, self.port_name + ) + .as_str(), + ) + } +} + pub struct BrocadeClient { - client: Handle, - elevated_user: UserConfig, + ip: IpAddress, + user: UserConfig, options: BrocadeOptions, } @@ -55,8 +67,8 @@ impl Default for TimeoutConfig { fn default() -> Self { Self { shell_ready: Duration::from_secs(3), - command_execution: Duration::from_secs(10), - cleanup: Duration::from_secs(3), + command_execution: Duration::from_secs(60), // Commands like `deploy` (for a LAG) can take a while + cleanup: Duration::from_secs(10), message_wait: Duration::from_millis(500), } } @@ -96,91 +108,127 @@ impl BrocadeClient { .ok_or_else(|| Error::ConfigurationError("No IP addresses provided".to_string()))?; let options = options.unwrap_or_default(); - let config = russh::client::Config { - preferred: options.ssh.preferred_algorithms.clone(), - ..Default::default() - }; - - let mut client = russh::client::connect(Arc::new(config), (*ip, 22), Client {}).await?; - - if !client.authenticate_password(username, password).await? { - return Err(Error::AuthenticationError( - "ssh authentication failed".to_string(), - )); - } Ok(Self { - client, - options, - elevated_user: UserConfig { + ip: *ip, + user: UserConfig { username: username.to_string(), password: password.to_string(), }, + options, }) } pub async fn show_mac_address_table(&self) -> Result, Error> { + info!("[Brocade] Showing MAC address table..."); + let output = self .run_command("show mac-address", ExecutionMode::Regular) .await?; output .lines() - .skip(1) + .skip(2) .filter_map(|line| self.parse_mac_entry(line)) .collect() } - pub async fn configure_port_channel(&self, ports: &[String]) -> Result { - info!("[Brocade] Configuring port-channel with ports: {ports:?}"); - - let channel_id = self.find_available_channel_id().await?; - let commands = self.build_port_channel_commands(channel_id, ports); - - self.run_commands(commands, ExecutionMode::Privileged) - .await?; - Ok(channel_id) - } - pub async fn find_available_channel_id(&self) -> Result { - debug!("[Brocade] Finding next available channel id..."); + info!("[Brocade] Finding next available channel id..."); let output = self.run_command("show lag", ExecutionMode::Regular).await?; - let mut used_ids: Vec = output + let re = Regex::new(r"=== LAG .* ID\s+(\d+)").expect("Invalid regex"); + + let used_ids: HashSet = output .lines() .filter_map(|line| { - if line.trim_start().chars().next()?.is_ascii_digit() { - u8::from_str(line.split_whitespace().next()?).ok() - } else { - None - } + re.captures(line) + .and_then(|c| c.get(1)) + .and_then(|id_match| id_match.as_str().parse().ok()) }) .collect(); - used_ids.sort_unstable(); + let mut next_id: u8 = 1; + loop { + if !used_ids.contains(&next_id) { + break; + } + next_id += 1; + } - let next_id = (0u8..) - .find(|&id| used_ids.binary_search(&id).is_err()) - .unwrap_or(0); - debug!("[Brocade] Found channel id '{next_id}'"); + info!("[Brocade] Found channel id: {next_id}"); Ok(next_id) } + pub async fn create_port_channel( + &self, + channel_name: &str, + channel_id: u8, + ports: &[String], + ) -> Result<(), Error> { + info!( + "[Brocade] Configuring port-channel '{channel_name} {channel_id}' with ports: {ports:?}" + ); + + let commands = self.build_port_channel_commands(channel_name, channel_id, ports); + self.run_commands(commands, ExecutionMode::Privileged) + .await?; + + info!("[Brocade] Port-channel '{PORT_CHANNEL_NAME}' configured."); + Ok(()) + } + + pub async fn clear_port_channel(&self, channel_name: &str) -> Result<(), Error> { + debug!("[Brocade] Clearing port-channel: {channel_name}"); + + let commands = vec![ + "configure terminal".to_string(), + format!("no lag {channel_name}"), + "write memory".to_string(), + ]; + self.run_commands(commands, ExecutionMode::Privileged) + .await?; + + Ok(()) + } + + fn build_port_channel_commands( + &self, + channel_name: &str, + channel_id: u8, + ports: &[String], + ) -> Vec { + let mut commands = vec![ + "configure terminal".to_string(), + format!("lag {channel_name} static id {channel_id}"), + ]; + + for port in ports { + commands.push(format!("ports ethernet {port}")); + } + + commands.push(format!("primary-port {}", ports.first().unwrap())); + commands.push("deploy".into()); + commands.push("exit".into()); + commands.push("write memory".into()); + commands.push("exit".into()); + + commands + } + async fn run_command(&self, command: &str, mode: ExecutionMode) -> Result { if self.should_skip_command(command) { return Ok(String::new()); } - let mut channel = self.client.channel_open_session().await?; - self.setup_channel(&mut channel, mode).await?; + let mut channel = self.open_session(&mode).await?; let output = self .execute_command_in_session(&mut channel, command) .await?; let cleaned = self.clean_brocade_output(&output, command); - debug!("[Brocade] Command output:\n{cleaned}"); - self.cleanup_channel(&mut channel).await; // Cleanup/close the channel + self.close_session(channel, &mode).await?; Ok(cleaned) } @@ -190,33 +238,51 @@ impl BrocadeClient { return Ok(()); } - let mut channel = self.client.channel_open_session().await?; - self.setup_channel(&mut channel, mode).await?; + let mut channel = self.open_session(&mode).await?; for command in commands { if self.should_skip_command(&command) { continue; } - let output = self - .execute_command_in_session(&mut channel, &command) + self.execute_command_in_session(&mut channel, &command) .await?; - let cleaned = self.clean_brocade_output(&output, &command); - - debug!("[Brocade] Command output:\n{cleaned}"); - self.check_for_command_errors(&cleaned, &command)?; } - channel.data(&b"exit\n"[..]).await?; - self.cleanup_channel(&mut channel).await; + self.close_session(channel, &mode).await?; Ok(()) } + async fn open_session( + &self, + mode: &ExecutionMode, + ) -> Result, Error> { + let config = russh::client::Config { + preferred: self.options.ssh.preferred_algorithms.clone(), + ..Default::default() + }; + + let mut client = russh::client::connect(Arc::new(config), (self.ip, 22), Client {}).await?; + if !client + .authenticate_password(&self.user.username, &self.user.password) + .await? + { + return Err(Error::AuthenticationError( + "ssh authentication failed".to_string(), + )); + } + + let mut channel = client.channel_open_session().await?; + self.setup_channel(&mut channel, mode).await?; + + Ok(channel) + } + async fn setup_channel( &self, channel: &mut russh::Channel, - mode: ExecutionMode, + mode: &ExecutionMode, ) -> Result<(), Error> { // Setup PTY and shell channel @@ -246,9 +312,12 @@ impl BrocadeClient { tokio::time::sleep(Duration::from_millis(100)).await; let output = self.collect_command_output(channel).await?; + let output = String::from_utf8(output) + .map_err(|_| Error::UnexpectedError("Invalid UTF-8 in command output".to_string()))?; - String::from_utf8(output) - .map_err(|_| Error::UnexpectedError("Invalid UTF-8 in command output".to_string())) + self.check_for_command_errors(&output, command)?; + + Ok(output) } async fn try_elevate_session( @@ -272,13 +341,13 @@ impl BrocadeClient { if output.contains("User Name:") { channel - .data(format!("{}\n", self.elevated_user.username).as_bytes()) + .data(format!("{}\n", self.user.username).as_bytes()) .await?; buffer.clear(); } else if output.contains("Password:") { // Note: Brocade might not echo the password field channel - .data(format!("{}\n", self.elevated_user.password).as_bytes()) + .data(format!("{}\n", self.user.password).as_bytes()) .await?; buffer.clear(); } else if output.contains('>') { @@ -339,34 +408,81 @@ impl BrocadeClient { ) -> Result, Error> { let mut output = Vec::new(); let start = Instant::now(); - let mut command_complete = false; - while start.elapsed() < self.options.timeouts.command_execution && !command_complete { - match timeout(Duration::from_secs(2), channel.wait()).await { + let read_timeout = Duration::from_millis(500); + + let log_interval = Duration::from_secs(3); + let mut last_log = Instant::now(); + + loop { + if start.elapsed() > self.options.timeouts.command_execution { + return Err(Error::TimeoutError( + "Timeout waiting for command completion.".to_string(), + )); + } + + if start.elapsed() > Duration::from_secs(5) && last_log.elapsed() > log_interval { + info!("[Brocade] Waiting for command output..."); + last_log = Instant::now(); + } + + match timeout(read_timeout, channel.wait()).await { Ok(Some(ChannelMsg::Data { data } | ChannelMsg::ExtendedData { data, .. })) => { output.extend_from_slice(&data); - let current = String::from_utf8_lossy(&output); - if current.ends_with('>') || current.ends_with("# ") { - command_complete = true; + let current_output = String::from_utf8_lossy(&output); + if current_output.contains('>') || current_output.contains('#') { + return Ok(output); } } + Ok(Some(ChannelMsg::Eof | ChannelMsg::Close)) => { - command_complete = true; + return Ok(output); } + Ok(Some(ChannelMsg::ExitStatus { exit_status })) => { debug!("[Brocade] Command exit status: {exit_status}"); + continue; + } + + Ok(Some(_)) => continue, // Ignore other channel messages + Ok(None) | Err(_) => { + if output.is_empty() { + if let Ok(None) = timeout(read_timeout, channel.wait()).await { + // Check one last time if channel is closed + break; + } + continue; + } + + // If we received a timeout (Err) and have output, wait a short time to check for a late prompt + tokio::time::sleep(Duration::from_millis(100)).await; + + let current_output = String::from_utf8_lossy(&output); + if current_output.contains('>') || current_output.contains('#') { + return Ok(output); + } + + continue; } - Ok(Some(_)) => continue, - Ok(None) => break, - Err(_) => break, } } Ok(output) } - async fn cleanup_channel(&self, channel: &mut russh::Channel) { + async fn close_session( + &self, + mut channel: russh::Channel, + mode: &ExecutionMode, + ) -> Result<(), Error> { + debug!("[Brocade] Closing session..."); + + channel.data(&b"exit\n"[..]).await?; + if let ExecutionMode::Privileged = mode { + channel.data(&b"exit\n"[..]).await?; // Previous exit closed "enable" mode + } + let start = Instant::now(); while start.elapsed() < self.options.timeouts.cleanup { @@ -376,10 +492,14 @@ impl BrocadeClient { Ok(None) | Err(_) => break, } } + + debug!("[Brocade] Session '{}' closed, bye bye.", channel.id()); + + Ok(()) } fn should_skip_command(&self, command: &str) -> bool { - if !command.starts_with("show") && self.options.dry_run { + if (command.starts_with("write") || command.starts_with("deploy")) && self.options.dry_run { info!("[Brocade] Dry-run mode enabled, skipping command: {command}"); return true; } @@ -387,14 +507,26 @@ impl BrocadeClient { } fn parse_mac_entry(&self, line: &str) -> Option> { + debug!("[Brocade] Parsing mac address entry: {line}"); let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() < 3 { return None; } - let vlan = u16::from_str(parts[0]).ok()?; - let mac_address = MacAddress::try_from(parts[1].to_string()).ok()?; - let port_name = parts[2].to_string(); + let (vlan, mac_address, port_name) = match parts.len() { + 3 => ( + // Format: VLAN/MAC/Port + u16::from_str(parts[0]).ok()?, + parse_brocade_mac_address(parts[1]).ok()?, + parts[2].to_string(), + ), + _ => ( + // Format: MAC/Port/Type/Index, default VLAN usually 1 + 1, + parse_brocade_mac_address(parts[0]).ok()?, + parts[1].to_string(), + ), + }; Some(Ok(MacAddressEntry { vlan, @@ -403,28 +535,8 @@ impl BrocadeClient { })) } - fn build_port_channel_commands(&self, channel_id: u8, ports: &[String]) -> Vec { - let mut commands = vec![ - "configure terminal".to_string(), - format!("interface Port-channel {channel_id}"), - "no ip address".to_string(), - "exit".to_string(), - ]; - - for port in ports { - commands.extend([ - format!("interface {port}"), - format!("channel-group {channel_id} mode active"), - "exit".to_string(), - ]); - } - - commands.push("write memory".to_string()); - commands - } - fn clean_brocade_output(&self, raw_output: &str, command: &str) -> String { - trace!("[Brocade] Received raw output:\n{raw_output}"); + debug!("[Brocade] Received raw output:\n{raw_output}"); let lines: Vec<&str> = raw_output.lines().collect(); let mut cleaned_lines = Vec::new(); @@ -465,7 +577,10 @@ impl BrocadeClient { cleaned_lines.pop(); } - cleaned_lines.join("\n") + let output = cleaned_lines.join("\n"); + debug!("[Brocade] Command output:\n{output}"); + + output } fn is_prompt_line(&self, line: &str) -> bool { @@ -484,16 +599,14 @@ impl BrocadeClient { "configuration error", "failed to", "error:", - "warning:", ]; let output_lower = output.to_lowercase(); - if let Some(pattern) = ERROR_PATTERNS.iter().find(|&&p| output_lower.contains(p)) { + if ERROR_PATTERNS.iter().any(|&p| output_lower.contains(p)) { return Err(Error::CommandError(format!( - "Command '{}' failed with error containing '{}': {}", + "Command '{}' failed: {}", command, - pattern, output.trim() ))); } @@ -509,6 +622,28 @@ impl BrocadeClient { } } +fn parse_brocade_mac_address(value: &str) -> Result { + // Remove periods from the Brocade format + let cleaned_mac = value.replace('.', ""); + + // Ensure the cleaned string has the correct length for a MAC address + if cleaned_mac.len() != 12 { + return Err(format!("Invalid MAC address: {value}",)); + } + + // Parse the hexadecimal string into bytes + let mut bytes = [0u8; 6]; + for (i, pair) in cleaned_mac.as_bytes().chunks(2).enumerate() { + let byte_str = + std::str::from_utf8(pair).map_err(|_| "Invalid UTF-8 sequence".to_string())?; + + bytes[i] = u8::from_str_radix(byte_str, 16) + .map_err(|_| format!("Invalid hex byte in MAC address: {value}"))?; + } + + Ok(MacAddress(bytes)) +} + struct Client; #[async_trait] @@ -541,7 +676,7 @@ impl Display for Error { Error::ConfigurationError(msg) => write!(f, "Configuration error: {msg}"), Error::TimeoutError(msg) => write!(f, "Timeout error: {msg}"), Error::UnexpectedError(msg) => write!(f, "Unexpected error: {msg}"), - Error::CommandError(msg) => write!(f, "Command failed: {msg}"), + Error::CommandError(msg) => write!(f, "{msg}"), } } } @@ -555,7 +690,7 @@ impl From for String { impl std::error::Error for Error {} impl From for Error { - fn from(_value: russh::Error) -> Self { - Error::NetworkError("Russh client error".to_string()) + fn from(value: russh::Error) -> Self { + Error::NetworkError(format!("Russh client error: {value}")) } } diff --git a/harmony/src/domain/topology/ha_cluster.rs b/harmony/src/domain/topology/ha_cluster.rs index 550d0a7..1052733 100644 --- a/harmony/src/domain/topology/ha_cluster.rs +++ b/harmony/src/domain/topology/ha_cluster.rs @@ -318,7 +318,11 @@ impl HAClusterTopology { Ok(Box::new(client)) } - async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> { + async fn configure_port_channel( + &self, + host: &PhysicalHost, + config: &HostNetworkConfig, + ) -> Result<(), SwitchError> { debug!("Configuring port channel: {config:#?}"); let client = self.get_switch_client().await?; @@ -329,7 +333,7 @@ impl HAClusterTopology { .collect(); client - .configure_port_channel(switch_ports) + .configure_port_channel(&format!("Harmony_{}", host.id), switch_ports) .await .map_err(|e| SwitchError::new(format!("Failed to configure switch: {e}")))?; @@ -526,8 +530,8 @@ impl Switch for HAClusterTopology { host: &PhysicalHost, config: HostNetworkConfig, ) -> Result<(), SwitchError> { - self.configure_bond(host, &config).await?; - self.configure_port_channel(&config).await + // self.configure_bond(host, &config).await?; + self.configure_port_channel(host, &config).await } } diff --git a/harmony/src/domain/topology/network.rs b/harmony/src/domain/topology/network.rs index 98424f5..81487bb 100644 --- a/harmony/src/domain/topology/network.rs +++ b/harmony/src/domain/topology/network.rs @@ -222,7 +222,12 @@ impl Error for SwitchError {} #[async_trait] pub trait SwitchClient: Send + Sync { async fn find_port(&self, mac_address: &MacAddress) -> Result, SwitchError>; - async fn configure_port_channel(&self, switch_ports: Vec) -> Result; + + async fn configure_port_channel( + &self, + channel_name: &str, + switch_ports: Vec, + ) -> Result; } #[cfg(test)] diff --git a/harmony/src/infra/brocade.rs b/harmony/src/infra/brocade.rs index e1afc3a..fc98e3d 100644 --- a/harmony/src/infra/brocade.rs +++ b/harmony/src/infra/brocade.rs @@ -29,7 +29,7 @@ impl SwitchClient for BrocadeSwitchClient { .brocade .show_mac_address_table() .await - .map_err(|e| SwitchError::new(format!("Failed to get mac address table: {e}")))?; + .map_err(|e| SwitchError::new(format!("{e}")))?; Ok(table .iter() @@ -37,11 +37,23 @@ impl SwitchClient for BrocadeSwitchClient { .map(|entry| entry.port_name.clone())) } - async fn configure_port_channel(&self, switch_ports: Vec) -> Result { - self.brocade - .configure_port_channel(&switch_ports) + async fn configure_port_channel( + &self, + channel_name: &str, + switch_ports: Vec, + ) -> Result { + let channel_id = self + .brocade + .find_available_channel_id() .await - .map_err(|e| SwitchError::new(format!("Failed to configure port channel: {e}"))) + .map_err(|e| SwitchError::new(format!("{e}")))?; + + self.brocade + .create_port_channel(channel_name, channel_id, &switch_ports) + .await + .map_err(|e| SwitchError::new(format!("{e}")))?; + + Ok(channel_id) } } diff --git a/harmony/src/modules/okd/host_network.rs b/harmony/src/modules/okd/host_network.rs index f903f6b..873830d 100644 --- a/harmony/src/modules/okd/host_network.rs +++ b/harmony/src/modules/okd/host_network.rs @@ -89,9 +89,10 @@ impl Interpret for HostNetworkConfigurationInterpret { if !switch_ports.is_empty() { configured_host_count += 1; - let _ = topology + topology .configure_host_network(host, HostNetworkConfig { switch_ports }) - .await; + .await + .map_err(|e| InterpretError::new(format!("Failed to configure host: {e}")))?; } }