Files
harmony/harmony-k8s/src/port_forward.rs
Jean-Gabriel Gill-Couture b05a341a80 feat(harmony-k8s, k3d): add exec_pod, delete_resource, port_forward, and k3d getters
harmony-k8s:
- exec_pod() and exec_pod_capture_output(): exec commands in pods by
  name (not just label), with proper stdout/stderr capture
- delete_resource<K>(): generic typed delete using ScopeResolver,
  idempotent (404 = Ok)
- port_forward(): native port forwarding via kube-rs Portforwarder +
  tokio TcpListener, replacing kubectl subprocess. Returns
  PortForwardHandle that auto-aborts on drop.

k3d:
- base_dir(), cluster_name(), context_name() public getters

Also adds tokio "net" feature to workspace for TcpListener.
2026-03-28 23:47:42 -04:00

134 lines
4.3 KiB
Rust

use std::net::SocketAddr;
use k8s_openapi::api::core::v1::Pod;
use kube::{Api, Error, error::DiscoveryError};
use log::{debug, error, info};
use tokio::net::TcpListener;
use crate::client::K8sClient;
/// Handle to a running port-forward. The forward is stopped when the handle is
/// dropped (or when [`abort`](Self::abort) is called explicitly).
pub struct PortForwardHandle {
local_addr: SocketAddr,
abort_handle: tokio::task::AbortHandle,
}
impl PortForwardHandle {
/// The local address the listener is bound to.
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
/// The local port (convenience for `local_addr().port()`).
pub fn port(&self) -> u16 {
self.local_addr.port()
}
/// Stop the port-forward and close the listener.
pub fn abort(&self) {
self.abort_handle.abort();
}
}
impl Drop for PortForwardHandle {
fn drop(&mut self) {
self.abort_handle.abort();
}
}
impl K8sClient {
/// Forward a pod port to a local TCP listener.
///
/// Binds `127.0.0.1:{local_port}` (pass 0 to let the OS pick a free port)
/// and proxies every incoming TCP connection to the pod's `remote_port`
/// through the Kubernetes API server's portforward subresource (WebSocket).
///
/// Returns a [`PortForwardHandle`] whose [`port()`](PortForwardHandle::port)
/// gives the actual bound port. The forward runs in a background task and
/// is automatically stopped when the handle is dropped.
pub async fn port_forward(
&self,
pod_name: &str,
namespace: &str,
local_port: u16,
remote_port: u16,
) -> Result<PortForwardHandle, Error> {
let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], local_port)))
.await
.map_err(|e| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Failed to bind 127.0.0.1:{local_port}: {e}"
)))
})?;
let local_addr = listener.local_addr().map_err(|e| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Failed to get local address: {e}"
)))
})?;
info!(
"Port-forward {} -> {}/{}:{}",
local_addr, namespace, pod_name, remote_port
);
let client = self.client.clone();
let ns = namespace.to_string();
let pod = pod_name.to_string();
let task = tokio::spawn(async move {
let api: Api<Pod> = Api::namespaced(client, &ns);
loop {
let (mut tcp_stream, peer) = match listener.accept().await {
Ok(conn) => conn,
Err(e) => {
debug!("Port-forward listener accept error: {e}");
break;
}
};
debug!("Port-forward connection from {peer}");
let api = api.clone();
let pod = pod.clone();
tokio::spawn(async move {
let mut pf = match api.portforward(&pod, &[remote_port]).await {
Ok(pf) => pf,
Err(e) => {
error!("Port-forward WebSocket setup failed: {e}");
return;
}
};
let mut kube_stream = match pf.take_stream(remote_port) {
Some(s) => s,
None => {
error!("Port-forward: no stream for port {remote_port}");
return;
}
};
match tokio::io::copy_bidirectional(&mut tcp_stream, &mut kube_stream).await {
Ok((from_client, from_pod)) => {
debug!(
"Port-forward connection closed ({from_client} bytes sent, {from_pod} bytes received)"
);
}
Err(e) => {
debug!("Port-forward copy error: {e}");
}
}
drop(pf);
});
}
});
Ok(PortForwardHandle {
local_addr,
abort_handle: task.abort_handle(),
})
}
}