harmony-k8s: - exec_pod() and exec_pod_capture_output(): exec commands in pods by name (not just label), with proper stdout/stderr capture - delete_resource<K>(): generic typed delete using ScopeResolver, idempotent (404 = Ok) - port_forward(): native port forwarding via kube-rs Portforwarder + tokio TcpListener, replacing kubectl subprocess. Returns PortForwardHandle that auto-aborts on drop. k3d: - base_dir(), cluster_name(), context_name() public getters Also adds tokio "net" feature to workspace for TcpListener.
134 lines
4.3 KiB
Rust
134 lines
4.3 KiB
Rust
use std::net::SocketAddr;
|
|
|
|
use k8s_openapi::api::core::v1::Pod;
|
|
use kube::{Api, Error, error::DiscoveryError};
|
|
use log::{debug, error, info};
|
|
use tokio::net::TcpListener;
|
|
|
|
use crate::client::K8sClient;
|
|
|
|
/// Handle to a running port-forward. The forward is stopped when the handle is
|
|
/// dropped (or when [`abort`](Self::abort) is called explicitly).
|
|
pub struct PortForwardHandle {
|
|
local_addr: SocketAddr,
|
|
abort_handle: tokio::task::AbortHandle,
|
|
}
|
|
|
|
impl PortForwardHandle {
|
|
/// The local address the listener is bound to.
|
|
pub fn local_addr(&self) -> SocketAddr {
|
|
self.local_addr
|
|
}
|
|
|
|
/// The local port (convenience for `local_addr().port()`).
|
|
pub fn port(&self) -> u16 {
|
|
self.local_addr.port()
|
|
}
|
|
|
|
/// Stop the port-forward and close the listener.
|
|
pub fn abort(&self) {
|
|
self.abort_handle.abort();
|
|
}
|
|
}
|
|
|
|
impl Drop for PortForwardHandle {
|
|
fn drop(&mut self) {
|
|
self.abort_handle.abort();
|
|
}
|
|
}
|
|
|
|
impl K8sClient {
|
|
/// Forward a pod port to a local TCP listener.
|
|
///
|
|
/// Binds `127.0.0.1:{local_port}` (pass 0 to let the OS pick a free port)
|
|
/// and proxies every incoming TCP connection to the pod's `remote_port`
|
|
/// through the Kubernetes API server's portforward subresource (WebSocket).
|
|
///
|
|
/// Returns a [`PortForwardHandle`] whose [`port()`](PortForwardHandle::port)
|
|
/// gives the actual bound port. The forward runs in a background task and
|
|
/// is automatically stopped when the handle is dropped.
|
|
pub async fn port_forward(
|
|
&self,
|
|
pod_name: &str,
|
|
namespace: &str,
|
|
local_port: u16,
|
|
remote_port: u16,
|
|
) -> Result<PortForwardHandle, Error> {
|
|
let listener = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], local_port)))
|
|
.await
|
|
.map_err(|e| {
|
|
Error::Discovery(DiscoveryError::MissingResource(format!(
|
|
"Failed to bind 127.0.0.1:{local_port}: {e}"
|
|
)))
|
|
})?;
|
|
|
|
let local_addr = listener.local_addr().map_err(|e| {
|
|
Error::Discovery(DiscoveryError::MissingResource(format!(
|
|
"Failed to get local address: {e}"
|
|
)))
|
|
})?;
|
|
|
|
info!(
|
|
"Port-forward {} -> {}/{}:{}",
|
|
local_addr, namespace, pod_name, remote_port
|
|
);
|
|
|
|
let client = self.client.clone();
|
|
let ns = namespace.to_string();
|
|
let pod = pod_name.to_string();
|
|
|
|
let task = tokio::spawn(async move {
|
|
let api: Api<Pod> = Api::namespaced(client, &ns);
|
|
loop {
|
|
let (mut tcp_stream, peer) = match listener.accept().await {
|
|
Ok(conn) => conn,
|
|
Err(e) => {
|
|
debug!("Port-forward listener accept error: {e}");
|
|
break;
|
|
}
|
|
};
|
|
|
|
debug!("Port-forward connection from {peer}");
|
|
|
|
let api = api.clone();
|
|
let pod = pod.clone();
|
|
tokio::spawn(async move {
|
|
let mut pf = match api.portforward(&pod, &[remote_port]).await {
|
|
Ok(pf) => pf,
|
|
Err(e) => {
|
|
error!("Port-forward WebSocket setup failed: {e}");
|
|
return;
|
|
}
|
|
};
|
|
|
|
let mut kube_stream = match pf.take_stream(remote_port) {
|
|
Some(s) => s,
|
|
None => {
|
|
error!("Port-forward: no stream for port {remote_port}");
|
|
return;
|
|
}
|
|
};
|
|
|
|
match tokio::io::copy_bidirectional(&mut tcp_stream, &mut kube_stream).await {
|
|
Ok((from_client, from_pod)) => {
|
|
debug!(
|
|
"Port-forward connection closed ({from_client} bytes sent, {from_pod} bytes received)"
|
|
);
|
|
}
|
|
Err(e) => {
|
|
debug!("Port-forward copy error: {e}");
|
|
}
|
|
}
|
|
|
|
drop(pf);
|
|
});
|
|
}
|
|
});
|
|
|
|
Ok(PortForwardHandle {
|
|
local_addr,
|
|
abort_handle: task.abort_handle(),
|
|
})
|
|
}
|
|
}
|