From 1199564122a1e327f1f050e8b9f51013bb47bd3c Mon Sep 17 00:00:00 2001 From: Ian Letourneau Date: Sun, 28 Sep 2025 15:58:13 -0400 Subject: [PATCH] replace command execution by interactive shell --- brocade/src/lib.rs | 485 +++++++++++++++++++++++++++++---------------- 1 file changed, 318 insertions(+), 167 deletions(-) diff --git a/brocade/src/lib.rs b/brocade/src/lib.rs index 411d9de..3a47d5a 100644 --- a/brocade/src/lib.rs +++ b/brocade/src/lib.rs @@ -2,17 +2,20 @@ use std::{ borrow::Cow, fmt::{self, Display}, sync::Arc, + time::Duration, }; use async_trait::async_trait; use harmony_types::net::{IpAddress, MacAddress}; use log::{debug, info}; use russh::{ + ChannelMsg, client::{Handle, Handler}, kex::DH_G1_SHA1, }; use russh_keys::key::{self, SSH_RSA}; use std::str::FromStr; +use tokio::time::{Instant, timeout}; #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)] pub struct MacAddressEntry { @@ -23,13 +26,40 @@ pub struct MacAddressEntry { pub struct BrocadeClient { client: Handle, + elevated_user: UserConfig, options: BrocadeOptions, } +#[derive(Default, Clone, Debug)] +struct UserConfig { + username: String, + password: String, +} + #[derive(Default, Clone, Debug)] pub struct BrocadeOptions { pub dry_run: bool, pub ssh: SshOptions, + pub timeouts: TimeoutConfig, +} + +#[derive(Clone, Debug)] +pub struct TimeoutConfig { + pub shell_ready: Duration, + pub command_execution: Duration, + pub cleanup: Duration, + pub message_wait: Duration, +} + +impl Default for TimeoutConfig { + fn default() -> Self { + Self { + shell_ready: Duration::from_secs(3), + command_execution: Duration::from_secs(10), + cleanup: Duration::from_secs(3), + message_wait: Duration::from_millis(500), + } + } } #[derive(Clone, Debug)] @@ -56,87 +86,51 @@ impl BrocadeClient { password: &str, options: Option, ) -> Result { - if ip_addresses.is_empty() { - return Err(Error::ConfigurationError( - "No IP addresses provided".to_string(), - )); - } + let ip = ip_addresses + .first() + .ok_or_else(|| Error::ConfigurationError("No IP addresses provided".to_string()))?; - let ip = ip_addresses[0]; // FIXME: Find a better way to get master switch IP address let options = options.unwrap_or_default(); - let config = russh::client::Config { preferred: options.ssh.preferred_algorithms.clone(), ..Default::default() }; - let mut client = russh::client::connect(Arc::new(config), (ip, 22), Client {}).await?; - match client.authenticate_password(username, password).await? { - true => Ok(Self { client, options }), - false => Err(Error::AuthenticationError( + let mut client = russh::client::connect(Arc::new(config), (*ip, 22), Client {}).await?; + + if !client.authenticate_password(username, password).await? { + return Err(Error::AuthenticationError( "ssh authentication failed".to_string(), - )), + )); } + + Ok(Self { + client, + options, + elevated_user: UserConfig { + username: username.to_string(), + password: password.to_string(), + }, + }) } pub async fn show_mac_address_table(&self) -> Result, Error> { let output = self.run_command("show mac-address").await?; - let mut entries = Vec::new(); - // The Brocade output usually has a header and then one entry per line. - // We will skip the header and parse each line. - // Sample line: "1234 AA:BB:CC:DD:EE:F1 GigabitEthernet1/1/1" - for line in output.lines().skip(1) { - // Skip the header row - let parts: Vec<&str> = line.split_whitespace().collect(); - if parts.len() >= 3 { - // Assuming the format is: - if let Ok(vlan) = u16::from_str(parts[0]) { - let mac = MacAddress::try_from(parts[1].to_string()); - let port = parts[2].to_string(); - - if let Ok(mac_address) = mac { - entries.push(MacAddressEntry { - vlan, - mac_address, - port_name: port, - }); - } - } - } - } - - Ok(entries) + output + .lines() + .skip(1) + .filter_map(|line| self.parse_mac_entry(line)) + .collect() } pub async fn configure_port_channel(&self, ports: &[String]) -> Result { info!("[Brocade] Configuring port-channel with ports: {ports:?}"); let channel_id = self.find_available_channel_id().await?; - let mut commands = Vec::new(); - - // Start configuration mode. - commands.push("configure terminal".to_string()); - - // Create the port channel interface. - commands.push(format!("interface Port-channel {channel_id}")); - commands.push("no ip address".to_string()); - commands.push("exit".to_string()); - - // Configure each physical port to join the channel. - for port in ports { - commands.push(format!("interface {port}")); - // 'channel-group' command to add the interface to the port channel. - // Using 'mode active' enables LACP. - commands.push(format!("channel-group {channel_id} mode active")); - commands.push("exit".to_string()); - } - - // Save the configuration. - commands.push("write memory".to_string()); + let commands = self.build_port_channel_commands(channel_id, ports); self.run_commands(commands).await?; - Ok(channel_id) } @@ -144,150 +138,305 @@ impl BrocadeClient { debug!("[Brocade] Finding next available channel id..."); let output = self.run_command("show lag").await?; - let mut used_ids = Vec::new(); - - // Sample output line: "3 Po3(SU) LACP Eth Yes 128/128 active " - // We're looking for the ID, which is the first number. - for line in output.lines() { - if line.trim().starts_with(|c: char| c.is_ascii_digit()) { - let parts: Vec<&str> = line.split_whitespace().collect(); - if let Ok(id) = u8::from_str(parts[0]) { - used_ids.push(id); + let mut used_ids: Vec = output + .lines() + .filter_map(|line| { + if line.trim_start().chars().next()?.is_ascii_digit() { + u8::from_str(line.split_whitespace().next()?).ok() + } else { + None } - } - } + }) + .collect(); - // Sort the used IDs to find the next available number. - used_ids.sort(); - - let mut next_id = 0; - for &id in &used_ids { - if id == next_id { - next_id += 1; - } else { - // Found a gap, so this is our ID. - return Ok(next_id); - } - } + used_ids.sort_unstable(); + let next_id = (0u8..) + .find(|&id| used_ids.binary_search(&id).is_err()) + .unwrap_or(0); debug!("[Brocade] Found channel id '{next_id}'"); Ok(next_id) } async fn run_command(&self, command: &str) -> Result { - if !command.starts_with("show") && self.options.dry_run { - info!("[Brocade] Dry-run mode enabled, skipping command: {command}"); - return Ok("".into()); + if self.should_skip_command(command) { + return Ok(String::new()); } - debug!("[Brocade] Running command: '{command}'..."); - let mut channel = self.client.channel_open_session().await?; - let mut output = Vec::new(); + self.setup_channel(&mut channel).await?; + self.wait_for_shell_ready(&mut channel).await?; - channel.exec(true, command).await?; + let output = self + .execute_command_in_session(&mut channel, command) + .await?; + let cleaned = self.clean_brocade_output(&output, command); - loop { - let Some(msg) = channel.wait().await else { - break; - }; + debug!("[Brocade] Command output:\n{cleaned}"); + self.cleanup_channel(&mut channel).await; // Cleanup/close the channel - match msg { - russh::ChannelMsg::ExtendedData { ref data, .. } - | russh::ChannelMsg::Data { ref data } => { - output.append(&mut data.to_vec()); - } - russh::ChannelMsg::ExitStatus { exit_status } => { - if exit_status != 0 { - let output_str = String::from_utf8(output).unwrap_or_default(); - return Err(Error::CommandError(format!( - "Command failed with exit status {exit_status}, output {output_str}", - ))); - } - } - russh::ChannelMsg::Eof => { - channel.close().await?; - } - russh::ChannelMsg::Close => { - break; - } - russh::ChannelMsg::Success | russh::ChannelMsg::WindowAdjusted { .. } => {} - _ => { - return Err(Error::UnexpectedError(format!( - "Russh got unexpected msg {msg:?}" - ))); - } - } - } - - let output = String::from_utf8(output).expect("Output should be UTF-8 compatible"); - debug!("[Brocade] Command output:\n{output}"); - Ok(output) + Ok(cleaned) } async fn run_commands(&self, commands: Vec) -> Result<(), Error> { - // Execute commands sequentially and check for errors immediately. + if commands.is_empty() { + return Ok(()); + } + + let mut channel = self.client.channel_open_session().await?; + self.setup_channel(&mut channel).await?; + self.wait_for_shell_ready(&mut channel).await?; + for command in commands { - if !command.starts_with("show") && self.options.dry_run { - info!("[Brocade] Dry-run mode enabled, skipping command: {command}"); + if self.should_skip_command(&command) { continue; } - debug!("[Brocade] Running command: '{command}'..."); + let output = self + .execute_command_in_session(&mut channel, &command) + .await?; + let cleaned = self.clean_brocade_output(&output, &command); - let mut channel = self.client.channel_open_session().await?; - let mut output = Vec::new(); - let mut close_received = false; + debug!("[Brocade] Command output:\n{cleaned}"); + self.check_for_command_errors(&cleaned, &command)?; + } - channel.exec(true, command.as_str()).await?; + channel.data(&b"exit\n"[..]).await?; + self.cleanup_channel(&mut channel).await; - loop { - let Some(msg) = channel.wait().await else { - break; - }; + Ok(()) + } - match msg { - russh::ChannelMsg::ExtendedData { ref data, .. } - | russh::ChannelMsg::Data { ref data } => { - output.append(&mut data.to_vec()); - } - russh::ChannelMsg::ExitStatus { exit_status } => { - if exit_status != 0 { - let output_str = String::from_utf8(output).unwrap_or_default(); - return Err(Error::CommandError(format!( - "Command failed with exit status {exit_status}: {output_str}", - ))); - } - } - russh::ChannelMsg::Eof => { - channel.close().await?; - } - russh::ChannelMsg::Close => { - close_received = true; - break; - } - russh::ChannelMsg::Success | russh::ChannelMsg::WindowAdjusted { .. } => {} - _ => { - return Err(Error::UnexpectedError(format!( - "Russh got unexpected msg {msg:?}" - ))); + async fn setup_channel( + &self, + channel: &mut russh::Channel, + ) -> Result<(), Error> { + // Setup PTY and shell + channel + .request_pty(false, "vt100", 80, 24, 0, 0, &[]) + .await?; + channel.request_shell(false).await?; + Ok(()) + } + + async fn execute_command_in_session( + &self, + channel: &mut russh::Channel, + command: &str, + ) -> Result { + debug!("[Brocade] Running command: '{command}'..."); + + channel.data(format!("{}\n", command).as_bytes()).await?; + tokio::time::sleep(Duration::from_millis(100)).await; + + let output = self.collect_command_output(channel).await?; + + String::from_utf8(output) + .map_err(|_| Error::UnexpectedError("Invalid UTF-8 in command output".to_string())) + } + + async fn wait_for_shell_ready( + &self, + channel: &mut russh::Channel, + ) -> Result<(), Error> { + let mut buffer = Vec::new(); + let start = Instant::now(); + + while start.elapsed() < self.options.timeouts.shell_ready { + match timeout(self.options.timeouts.message_wait, channel.wait()).await { + Ok(Some(ChannelMsg::Data { data })) => { + buffer.extend_from_slice(&data); + let output = String::from_utf8_lossy(&buffer); + if output.contains('>') || output.contains('#') { + debug!("[Brocade] Shell ready: {}", output.trim()); + return Ok(()); } } + Ok(Some(_)) => continue, + Ok(None) => break, + Err(_) => continue, + } + } + Ok(()) + } + + async fn collect_command_output( + &self, + channel: &mut russh::Channel, + ) -> Result, Error> { + let mut output = Vec::new(); + let start = Instant::now(); + let mut command_complete = false; + + while start.elapsed() < self.options.timeouts.command_execution && !command_complete { + match timeout(Duration::from_secs(2), channel.wait()).await { + Ok(Some(ChannelMsg::Data { data } | ChannelMsg::ExtendedData { data, .. })) => { + output.extend_from_slice(&data); + let current = String::from_utf8_lossy(&output); + + if current.ends_with('>') || current.ends_with("# ") { + command_complete = true; + } + } + Ok(Some(ChannelMsg::Eof | ChannelMsg::Close)) => { + command_complete = true; + } + Ok(Some(ChannelMsg::ExitStatus { exit_status })) => { + debug!("[Brocade] Command exit status: {exit_status}"); + } + Ok(Some(_)) => continue, + Ok(None) => break, + Err(_) => break, + } + } + + Ok(output) + } + + async fn cleanup_channel(&self, channel: &mut russh::Channel) { + let start = Instant::now(); + + while start.elapsed() < self.options.timeouts.cleanup { + match timeout(self.options.timeouts.message_wait, channel.wait()).await { + Ok(Some(ChannelMsg::Close)) => break, + Ok(Some(_)) => continue, + Ok(None) | Err(_) => break, + } + } + } + + fn should_skip_command(&self, command: &str) -> bool { + if !command.starts_with("show") && self.options.dry_run { + info!("[Brocade] Dry-run mode enabled, skipping command: {command}"); + return true; + } + false + } + + fn parse_mac_entry(&self, line: &str) -> Option> { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() < 3 { + return None; + } + + let vlan = u16::from_str(parts[0]).ok()?; + let mac_address = MacAddress::try_from(parts[1].to_string()).ok()?; + let port_name = parts[2].to_string(); + + Some(Ok(MacAddressEntry { + vlan, + mac_address, + port_name, + })) + } + + fn build_port_channel_commands(&self, channel_id: u8, ports: &[String]) -> Vec { + let mut commands = vec![ + "configure terminal".to_string(), + format!("interface Port-channel {channel_id}"), + "no ip address".to_string(), + "exit".to_string(), + ]; + + for port in ports { + commands.extend([ + format!("interface {port}"), + format!("channel-group {channel_id} mode active"), + "exit".to_string(), + ]); + } + + commands.push("write memory".to_string()); + commands + } + + fn clean_brocade_output(&self, raw_output: &str, command: &str) -> String { + debug!("[Brocade] Received command output:\n{raw_output}"); + + let lines: Vec<&str> = raw_output.lines().collect(); + let mut cleaned_lines = Vec::new(); + let mut output_started = false; + let mut command_echo_found = false; + + for line in lines { + let trimmed = line.trim(); + + if !output_started && trimmed.is_empty() { + continue; } - if !close_received { - return Err(Error::UnexpectedError(format!( - "Channel closed without receiving a final CLOSE message for command: {}", - command - ))); + if !command_echo_found && trimmed.contains(command) { + command_echo_found = true; + output_started = true; + continue; } + + if self.is_prompt_line(trimmed) { + if output_started && !cleaned_lines.is_empty() { + break; + } + continue; + } + + if trimmed == "exit" { + break; + } + + if output_started && !trimmed.is_empty() { + cleaned_lines.push(line); + } + } + + // Remove trailing empty lines + while cleaned_lines.last() == Some(&"") { + cleaned_lines.pop(); + } + + cleaned_lines.join("\n") + } + + fn is_prompt_line(&self, line: &str) -> bool { + line.ends_with('#') || line.ends_with('>') || line.starts_with("SSH@") + } + + fn check_for_command_errors(&self, output: &str, command: &str) -> Result<(), Error> { + const ERROR_PATTERNS: &[&str] = &[ + "invalid input", + "syntax error", + "command not found", + "unknown command", + "permission denied", + "access denied", + "authentication failed", + "configuration error", + "failed to", + "error:", + "warning:", + ]; + + let output_lower = output.to_lowercase(); + + if let Some(pattern) = ERROR_PATTERNS.iter().find(|&&p| output_lower.contains(p)) { + return Err(Error::CommandError(format!( + "Command '{}' failed with error containing '{}': {}", + command, + pattern, + output.trim() + ))); + } + + if !command.starts_with("show") && output.trim().is_empty() { + return Err(Error::CommandError(format!( + "Command '{}' produced no output, which may indicate an error", + command + ))); } Ok(()) } } -struct Client {} +struct Client; #[async_trait] impl Handler for Client { @@ -306,6 +455,7 @@ pub enum Error { NetworkError(String), AuthenticationError(String), ConfigurationError(String), + TimeoutError(String), UnexpectedError(String), CommandError(String), } @@ -316,6 +466,7 @@ impl Display for Error { Error::NetworkError(msg) => write!(f, "Network error: {msg}"), Error::AuthenticationError(msg) => write!(f, "Authentication error: {msg}"), Error::ConfigurationError(msg) => write!(f, "Configuration error: {msg}"), + Error::TimeoutError(msg) => write!(f, "Timeout error: {msg}"), Error::UnexpectedError(msg) => write!(f, "Unexpected error: {msg}"), Error::CommandError(msg) => write!(f, "Command failed: {msg}"), }