use std::{ net::IpAddr, path::Path, sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; use tokio_stream::StreamExt; use async_trait::async_trait; use log::{debug, info, trace}; use russh::{ client::{Config, Handler, Msg}, Channel, }; use russh_keys::key; use russh_sftp::client::SftpSession; use tokio::io::AsyncWriteExt; use crate::{config::SshCredentials, Error}; use super::OPNsenseShell; use tokio::fs::read_dir; use tokio::fs::File; use tokio_util::codec::{BytesCodec, FramedRead}; #[derive(Debug)] pub struct SshOPNSenseShell { host: (IpAddr, u16), credentials: SshCredentials, ssh_config: Arc, } #[async_trait] impl OPNsenseShell for SshOPNSenseShell { async fn exec(&self, command: &str) -> Result { info!("Executing command on SshOPNSenseShell {command}"); self.run_command(command).await } async fn write_content_to_temp_file(&self, content: &str) -> Result { let temp_filename = format!( "/conf/harmony/opnsense-config-{}", SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() ); self.write_content_to_file(content, &temp_filename).await } async fn write_content_to_file(&self, content: &str, filename: &str) -> Result { let channel = self.get_ssh_channel().await?; channel .request_subsystem(true, "sftp") .await .expect("Should request sftp subsystem"); let sftp = SftpSession::new(channel.into_stream()) .await .expect("Should acquire sftp subsystem"); if let Some(parent) = Path::new(filename).parent() { if let Some(parent_str) = parent.to_str() { if !parent_str.is_empty() { self.ensure_remote_dir_exists(&sftp, parent_str).await?; } } } let mut file = sftp.create(filename).await.unwrap(); file.write_all(content.as_bytes()).await?; Ok(filename.to_string()) } async fn upload_folder(&self, source: &str, destination: &str) -> Result { let channel = self.get_ssh_channel().await?; channel .request_subsystem(true, "sftp") .await .expect("Should request sftp subsystem"); let sftp = SftpSession::new(channel.into_stream()) .await .expect("Should acquire sftp subsystem"); self.ensure_remote_dir_exists(&sftp, destination).await?; info!("Reading local directory {source}"); let mut entries = read_dir(source).await?; while let Some(entry) = entries.next_entry().await? { info!( "Checking directory entry {}", entry .path() .to_str() .expect("Directory entry should have a path : {entry:?}") ); if entry.file_type().await?.is_file() { debug!("Got a file"); let local_path = entry.path(); debug!("path {local_path:?}"); let file_name = local_path.file_name().unwrap().to_string_lossy(); let remote_path = format!("{}/{}", destination, file_name); info!( "Uploading local file {} to remote {}", local_path.to_str().unwrap_or_default(), remote_path ); debug!("Creating file {remote_path:?}"); let mut remote_file = sftp.create(remote_path.as_str()).await?; debug!("Writing file {remote_path:?}"); let local_file = File::open(&local_path).await?; let mut reader = FramedRead::new(local_file, BytesCodec::new()); while let Some(result) = reader.next().await { match result { Ok(bytes) => { if !bytes.is_empty() { AsyncWriteExt::write_all(&mut remote_file, &bytes).await?; } } Err(e) => todo!("Error unhandled {e}"), }; } } else if entry.file_type().await?.is_dir() { let sub_source = entry.path(); let sub_destination = format!("{}/{}", destination, entry.file_name().to_string_lossy()); self.upload_folder(sub_source.to_str().unwrap(), &sub_destination) .await?; } } Ok(destination.to_string()) } } impl SshOPNSenseShell { pub async fn get_ssh_channel(&self) -> Result, Error> { let mut ssh = russh::client::connect(self.ssh_config.clone(), self.host, Client {}).await?; match &self.credentials { SshCredentials::SshKey { username, key } => { ssh.authenticate_publickey(username, key.clone()).await?; } SshCredentials::Password { username, password } => { ssh.authenticate_password(username, password).await?; } } Ok(ssh.channel_open_session().await?) } async fn run_command(&self, command: &str) -> Result { debug!("Running ssh command {command}"); let mut channel = self.get_ssh_channel().await?; channel.exec(true, command).await?; wait_for_completion(&mut channel).await } async fn ensure_remote_dir_exists(&self, sftp: &SftpSession, path: &str) -> Result<(), Error> { if !sftp.try_exists(path).await? { info!("Creating remote directory {path}"); sftp.create_dir(path).await?; } Ok(()) } pub fn new(host: (IpAddr, u16), credentials: SshCredentials, ssh_config: Arc) -> Self { info!("Initializing SshOPNSenseShell on host {host:?}"); Self { host, credentials, ssh_config, } } } struct Client {} #[async_trait] impl Handler for Client { type Error = Error; async fn check_server_key( &mut self, _server_public_key: &key::PublicKey, ) -> Result { Ok(true) } } async fn wait_for_completion(channel: &mut Channel) -> Result { let mut output = Vec::new(); loop { let Some(msg) = channel.wait().await else { break; }; match msg { russh::ChannelMsg::ExtendedData { ref data, .. } | russh::ChannelMsg::Data { ref data } => { output.append(&mut data.to_vec()); } russh::ChannelMsg::ExitStatus { exit_status } => { if exit_status != 0 { return Err(Error::Command(format!( "Command failed with exit status {exit_status}, output {}", String::from_utf8(output).unwrap_or_default() ))); } } russh::ChannelMsg::Success | russh::ChannelMsg::WindowAdjusted { .. } | russh::ChannelMsg::Eof => {} _ => { return Err(Error::Unexpected(format!( "Russh got unexpected msg {msg:?}" ))) } } } let output = String::from_utf8(output).expect("Output should be UTF-8 compatible"); trace!("{output}"); Ok(output) }