diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 03720c6..966cc5d 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -53,6 +53,13 @@ jobs: push: ${{ github.event_name != 'pull_request' }} tags: ${{ steps.meta.outputs.tags }} labels: ${{ steps.meta.outputs.labels }} + # Enable Kubernetes pod/container attribution in the container image. + # It is dependency-free and runtime-gated (`--kubernetes auto`, the + # default, is inert outside a pod), so a single image serves both + # general container use and kubectl-rustnet. Native installs + # (cargo/brew/deb/rpm) leave the feature off and stay lean. + build-args: | + CARGO_FEATURES=kubernetes cache-from: type=gha cache-to: type=gha,mode=max diff --git a/Cargo.toml b/Cargo.toml index a2727c0..22511d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,6 +107,9 @@ linux-default = ["ebpf"] # Deprecated: kept for backwards compatibility ebpf = ["libbpf-rs", "dep:libbpf-cargo"] landlock = ["dep:landlock", "dep:caps"] macos-sandbox = [] +# Kubernetes pod and container resolution from cgroup paths and (Phase 2) the +# on-disk kubelet pods directory. Linux-only at runtime; no extra dependencies. +kubernetes = [] # Minimal cross configuration to override dependency conflicts [workspace.metadata.cross.build.env] diff --git a/Dockerfile b/Dockerfile index 5a12fe5..454530c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -33,8 +33,16 @@ COPY benches ./benches COPY assets/services ./assets/services COPY assets/oui.gz ./assets/oui.gz -# Build the application in release mode (eBPF is enabled by default on Linux) -RUN cargo build --release +# Build the application in release mode (eBPF is enabled by default on Linux). +# Optional features can be added with --build-arg CARGO_FEATURES=kubernetes +# (additive: default features stay on). The CI Kubernetes image variant passes +# CARGO_FEATURES=kubernetes; the default image leaves it empty. +ARG CARGO_FEATURES="" +RUN if [ -n "$CARGO_FEATURES" ]; then \ + cargo build --release --features "$CARGO_FEATURES"; \ + else \ + cargo build --release; \ + fi # Runtime stage - use trixie-slim to match GLIBC version from builder FROM debian:trixie-slim diff --git a/ROADMAP.md b/ROADMAP.md index e065be3..2e8b78c 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -49,6 +49,7 @@ The experimental eBPF support provides efficient process identification but has - **Real-time Process Updates**: Track process name changes and executable updates - **Container Support**: Better process identification within containerized environments - **Security Context**: Include process security attributes (capabilities, SELinux context, etc.) +- **Cross-Namespace Attribution for Kubernetes**: The current procfs fallback reads `/proc/net/tcp` from the reader's network namespace, so under `hostNetwork: true` (as used by kubectl-rustnet) it never sees sockets owned by pods in their own netns. The kubernetes feature ships a scoped per-PID `/proc//net/{tcp,tcp6,udp,udp6}` walker that covers TCP+UDP for kubepods PIDs, but it ticks at the enrichment interval and so misses sub-tick ephemeral flows. The complete fix lives in the eBPF layer: kprobes/fentry are netns-agnostic and fire at `connect()`/`accept()` time, but the current socket-tracker map is being pruned more aggressively than userspace can consume. Plan: extend map retention (or switch to a ring buffer of close events that userspace drains opportunistically), debug the "Map Lookup Miss" path under Kubernetes traffic patterns, and verify cross-namespace coverage end-to-end in a kind cluster. This work also benefits ICMP and raw-socket attribution, which procfs cannot reach. ## Features diff --git a/examples/k8s_resolver_check.rs b/examples/k8s_resolver_check.rs new file mode 100644 index 0000000..91f849a --- /dev/null +++ b/examples/k8s_resolver_check.rs @@ -0,0 +1,54 @@ +//! Smoke-test binary: takes a PID on the command line, runs the Kubernetes +//! resolver against `/proc//cgroup`, and prints the resolver's view. +//! Intended for running inside a `hostPID: true` pod on a Kubernetes node: +//! +//! cargo build --release --features kubernetes --example k8s_resolver_check +//! ./k8s_resolver_check $(pidof nginx | awk '{print $1}') + +#[cfg(feature = "kubernetes")] +fn main() { + let pid: u32 = std::env::args() + .nth(1) + .expect("usage: k8s_resolver_check ") + .parse() + .expect("PID must be an integer"); + + let path = format!("/proc/{pid}/cgroup"); + let raw = std::fs::read_to_string(&path).unwrap_or_else(|e| { + eprintln!("read {path}: {e}"); + std::process::exit(2); + }); + println!("--- /proc/{pid}/cgroup ---"); + println!("{raw}"); + + let resolver = rustnet_monitor::network::kubernetes::KubernetesResolver::new(); + match resolver.enrich(pid) { + Some(info) => { + println!("--- resolver output ---"); + println!("{info:#?}"); + // Mirror the JSON shape that log_connection_event would emit. + let mut obj = serde_json::Map::new(); + if let Some(v) = info.pod_uid { + obj.insert("pod_uid".into(), serde_json::json!(v)); + } + if let Some(v) = info.container_id { + obj.insert("container_id".into(), serde_json::json!(v)); + } + if let Some(v) = info.cgroup_path { + obj.insert("cgroup_path".into(), serde_json::json!(v)); + } + println!("--- JSONL 'kubernetes' block ---"); + println!("{}", serde_json::to_string(&obj).unwrap()); + } + None => { + println!("--- resolver returned None (no kubepods cgroup) ---"); + std::process::exit(1); + } + } +} + +#[cfg(not(feature = "kubernetes"))] +fn main() { + eprintln!("Build with --features kubernetes"); + std::process::exit(2); +} diff --git a/src/app.rs b/src/app.rs index b75acdb..c2e1846 100644 --- a/src/app.rs +++ b/src/app.rs @@ -204,6 +204,33 @@ fn log_connection_event( event["process_name"] = json!(process_name); } + // Add Kubernetes attribution if the process is part of a pod + #[cfg(feature = "kubernetes")] + if let Some(k8s) = &conn.k8s_info { + let mut obj = serde_json::Map::new(); + if let Some(v) = &k8s.pod_uid { + obj.insert("pod_uid".into(), json!(v)); + } + if let Some(v) = &k8s.pod_name { + obj.insert("pod_name".into(), json!(v)); + } + if let Some(v) = &k8s.pod_namespace { + obj.insert("pod_namespace".into(), json!(v)); + } + if let Some(v) = &k8s.container_id { + obj.insert("container_id".into(), json!(v)); + } + if let Some(v) = &k8s.container_name { + obj.insert("container_name".into(), json!(v)); + } + if let Some(v) = &k8s.cgroup_path { + obj.insert("cgroup_path".into(), json!(v)); + } + if !obj.is_empty() { + event["kubernetes"] = serde_json::Value::Object(obj); + } + } + // Add service name if available if let Some(service_name) = &conn.service_name { event["service_name"] = json!(service_name); @@ -306,6 +333,33 @@ fn log_pcap_connection(pcap_path: &str, conn: &Connection) { "state": conn.state(), }); + // Add Kubernetes attribution if the process is part of a pod + #[cfg(feature = "kubernetes")] + if let Some(k8s) = &conn.k8s_info { + let mut obj = serde_json::Map::new(); + if let Some(v) = &k8s.pod_uid { + obj.insert("pod_uid".into(), json!(v)); + } + if let Some(v) = &k8s.pod_name { + obj.insert("pod_name".into(), json!(v)); + } + if let Some(v) = &k8s.pod_namespace { + obj.insert("pod_namespace".into(), json!(v)); + } + if let Some(v) = &k8s.container_id { + obj.insert("container_id".into(), json!(v)); + } + if let Some(v) = &k8s.container_name { + obj.insert("container_name".into(), json!(v)); + } + if let Some(v) = &k8s.cgroup_path { + obj.insert("cgroup_path".into(), json!(v)); + } + if !obj.is_empty() { + event["kubernetes"] = serde_json::Value::Object(obj); + } + } + // Only add GeoIP fields when they have actual values if let Some(ref geoip) = conn.geoip_info { if let Some(ref cc) = geoip.country_code { @@ -364,6 +418,9 @@ pub struct Config { pub geoip_city_path: Option, /// Disable GeoIP lookups entirely pub disable_geoip: bool, + /// Kubernetes pod/container attribution mode + #[cfg(feature = "kubernetes")] + pub kubernetes_mode: crate::network::kubernetes::KubernetesMode, } impl Default for Config { @@ -382,6 +439,8 @@ impl Default for Config { geoip_asn_path: None, geoip_city_path: None, disable_geoip: false, + #[cfg(feature = "kubernetes")] + kubernetes_mode: crate::network::kubernetes::KubernetesMode::default(), } } } @@ -1036,6 +1095,8 @@ impl App { let pktap_active = Arc::clone(&self.pktap_active); let should_stop = Arc::clone(&self.should_stop); let process_detection_status = Arc::clone(&self.process_detection_status); + #[cfg(feature = "kubernetes")] + let kubernetes_mode = self.config.kubernetes_mode; thread::Builder::new() .name("process-enrichment".to_string()) @@ -1089,6 +1150,8 @@ impl App { pktap_active, process_detection_status, process_ready_tx, + #[cfg(feature = "kubernetes")] + kubernetes_mode, ) { error!("Process enrichment thread failed: {}", e); } @@ -1105,6 +1168,7 @@ impl App { pktap_active: Arc, process_detection_status: Arc>, process_ready_tx: std::sync::mpsc::SyncSender<()>, + #[cfg(feature = "kubernetes")] kubernetes_mode: crate::network::kubernetes::KubernetesMode, ) -> Result<()> { use crate::network::platform::DegradationReason; @@ -1113,6 +1177,25 @@ impl App { let process_lookup = create_process_lookup(use_pktap)?; + // Kubernetes pod/container attribution. `auto` enables only when rustnet + // is itself running inside a pod, so the resolver and the cross-namespace + // socket table are only built when enabled and non-Kubernetes hosts do + // no extra /proc work. The table stays empty when disabled. + #[cfg(feature = "kubernetes")] + let kubernetes_resolver = kubernetes_mode + .enabled() + .then(crate::network::kubernetes::KubernetesResolver::new); + #[cfg(feature = "kubernetes")] + if kubernetes_resolver.is_some() { + info!("Kubernetes pod/container attribution enabled"); + } + #[cfg(feature = "kubernetes")] + let mut k8s_socket_table = crate::network::kubernetes::KubernetesSocketTable::empty(); + #[cfg(feature = "kubernetes")] + if let Some(resolver) = &kubernetes_resolver { + k8s_socket_table = crate::network::kubernetes::KubernetesSocketTable::build(resolver); + } + // Signal that process detection (including eBPF loading) is complete. // The main thread waits for this before dropping eBPF capabilities. let _ = process_ready_tx.send(()); @@ -1163,6 +1246,14 @@ impl App { if let Err(e) = process_lookup.refresh() { debug!("Process lookup refresh failed: {}", e); } + // Refresh pod-name metadata and rebuild the cross-namespace + // socket table on the same cadence. + #[cfg(feature = "kubernetes")] + if let Some(resolver) = &kubernetes_resolver { + resolver.refresh_metadata(); + k8s_socket_table = + crate::network::kubernetes::KubernetesSocketTable::build(resolver); + } last_refresh = Instant::now(); } @@ -1217,6 +1308,36 @@ impl App { if did_enrich { enriched += 1; } + + // Look up Kubernetes pod/container metadata for the PID. + // Cheap after the first hit per PID (cached in the resolver). + #[cfg(feature = "kubernetes")] + if let Some(resolver) = &kubernetes_resolver + && entry.k8s_info.is_none() + && let Some(k8s) = resolver.enrich(pid) + { + entry.k8s_info = Some(k8s); + } + } else { + // The primary lookup couldn't attribute this connection. + // Under hostNetwork, that includes every pod-owned socket + // living in another network namespace. The socket table + // walks per-PID /proc//net/* (netns-aware) for kubepods + // PIDs and matches the 4-tuple, yielding both the PID and + // its pod/container metadata. + #[cfg(feature = "kubernetes")] + if entry.pid.is_none() + && let Some((pid, k8s)) = k8s_socket_table.lookup_connection(&entry) + { + entry.pid = Some(pid); + if entry.process_name.is_none() { + entry.process_name = crate::network::kubernetes::read_process_name(pid); + } + if entry.k8s_info.is_none() { + entry.k8s_info = Some(k8s); + } + enriched += 1; + } } } diff --git a/src/cli.rs b/src/cli.rs index 9417237..5da0a27 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -138,6 +138,19 @@ pub fn build_cli() -> Command { .action(clap::ArgAction::SetTrue), ); + #[cfg(feature = "kubernetes")] + let cmd = cmd.arg( + Arg::new("kubernetes") + .long("kubernetes") + .value_name("MODE") + .help( + "Kubernetes pod/container attribution: \"auto\" (enable only when running inside a pod), \"on\" (always), or \"off\"", + ) + .value_parser(["auto", "on", "off"]) + .default_value("auto") + .required(false), + ); + #[cfg(any( target_os = "linux", target_os = "windows", diff --git a/src/filter.rs b/src/filter.rs index d8d77f6..5b67eb1 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -1,5 +1,7 @@ //! Vim/fzf-style connection filter: parses `port:`, `src:`, `dst:`, -//! `sni:`, `process:`, `state:`, `proto:` keyword expressions (with +//! `sni:`, `process:`, `state:`, `proto:`, (and `pod:`, `ns:`, +//! `container:` when the `kubernetes` feature is enabled) keyword +//! expressions (with //! optional `(?i)…` regex literals via `regex-lite`) and matches them //! against live `Connection` records. @@ -52,6 +54,15 @@ pub enum FilterCriteria { Application(FilterValue), /// Match connection state (e.g., ESTABLISHED, SYN_RECV) State(FilterValue), + /// Match Kubernetes pod name or UID + #[cfg(feature = "kubernetes")] + Pod(FilterValue), + /// Match Kubernetes pod namespace + #[cfg(feature = "kubernetes")] + Namespace(FilterValue), + /// Match Kubernetes container name or ID + #[cfg(feature = "kubernetes")] + Container(FilterValue), } pub struct ConnectionFilter { @@ -160,6 +171,18 @@ impl ConnectionFilter { "state" => { criteria.push(FilterCriteria::State(parse_filter_value(&value))); } + #[cfg(feature = "kubernetes")] + "pod" => { + criteria.push(FilterCriteria::Pod(parse_filter_value(&value))); + } + #[cfg(feature = "kubernetes")] + "ns" | "namespace" => { + criteria.push(FilterCriteria::Namespace(parse_filter_value(&value))); + } + #[cfg(feature = "kubernetes")] + "container" | "cont" => { + criteria.push(FilterCriteria::Container(parse_filter_value(&value))); + } _ => { // Unknown keyword, treat as general search criteria.push(FilterCriteria::General(parse_filter_value( @@ -215,6 +238,24 @@ impl ConnectionFilter { FilterCriteria::Sni(fv) => self.matches_sni(connection, fv), FilterCriteria::Application(fv) => self.matches_application(connection, fv), FilterCriteria::State(fv) => match_text(&connection.state(), fv), + #[cfg(feature = "kubernetes")] + FilterCriteria::Pod(fv) => connection.k8s_info.as_ref().is_some_and(|k| { + k.pod_name.as_deref().is_some_and(|n| match_text(n, fv)) + || k.pod_uid.as_deref().is_some_and(|u| match_text(u, fv)) + }), + #[cfg(feature = "kubernetes")] + FilterCriteria::Namespace(fv) => connection.k8s_info.as_ref().is_some_and(|k| { + k.pod_namespace + .as_deref() + .is_some_and(|ns| match_text(ns, fv)) + }), + #[cfg(feature = "kubernetes")] + FilterCriteria::Container(fv) => connection.k8s_info.as_ref().is_some_and(|k| { + k.container_name + .as_deref() + .is_some_and(|n| match_text(n, fv)) + || k.container_id.as_deref().is_some_and(|c| match_text(c, fv)) + }), }) } @@ -782,4 +823,57 @@ mod tests { _ => panic!("Expected fallback to Partial for invalid regex"), } } + + #[cfg(feature = "kubernetes")] + #[test] + fn test_kubernetes_filter_keywords() { + use crate::network::types::*; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + + let mut conn = Connection::new( + Protocol::Tcp, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 80), + ProtocolState::Tcp(TcpState::Established), + ); + conn.k8s_info = Some(K8sInfo { + pod_uid: Some("c3b4d893-473e-43c2-8013-8ee2955a4630".to_string()), + pod_name: Some("nginx-86644db9cc-mf5lx".to_string()), + pod_namespace: Some("demo-traffic".to_string()), + container_id: Some( + "c16c7605305c854d8582a1db3d5bb3c4b6c89a08e914223e9d500682b3fb0b1b".to_string(), + ), + container_name: Some("nginx".to_string()), + cgroup_path: None, + }); + + // pod: matches by name (substring) + assert!(ConnectionFilter::parse("pod:nginx").matches(&conn)); + // pod: matches by UID prefix + assert!(ConnectionFilter::parse("pod:c3b4d893").matches(&conn)); + // ns: matches namespace + assert!(ConnectionFilter::parse("ns:demo-traffic").matches(&conn)); + assert!(ConnectionFilter::parse("namespace:demo").matches(&conn)); + // container: matches by name and by ID prefix + assert!(ConnectionFilter::parse("container:nginx").matches(&conn)); + assert!(ConnectionFilter::parse("container:c16c7605").matches(&conn)); + // Negative cases + assert!(!ConnectionFilter::parse("pod:redis").matches(&conn)); + assert!(!ConnectionFilter::parse("ns:kube-system").matches(&conn)); + assert!(!ConnectionFilter::parse("container:redis").matches(&conn)); + + // Combined filter + assert!(ConnectionFilter::parse("ns:demo-traffic pod:nginx").matches(&conn)); + + // Filter on connections without K8s info returns no match for any pod filter. + let bare = Connection::new( + Protocol::Tcp, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 80), + ProtocolState::Tcp(TcpState::Established), + ); + assert!(!ConnectionFilter::parse("pod:nginx").matches(&bare)); + assert!(!ConnectionFilter::parse("ns:demo").matches(&bare)); + assert!(!ConnectionFilter::parse("container:nginx").matches(&bare)); + } } diff --git a/src/main.rs b/src/main.rs index a000096..2aebbed 100644 --- a/src/main.rs +++ b/src/main.rs @@ -119,6 +119,15 @@ fn main() -> Result<()> { info!("Using GeoIP City database: {}", city_path); } + // Kubernetes pod/container attribution mode (values validated by clap) + #[cfg(feature = "kubernetes")] + if let Some(mode) = matches.get_one::("kubernetes") + && let Some(parsed) = network::kubernetes::KubernetesMode::parse(mode) + { + config.kubernetes_mode = parsed; + info!("Kubernetes attribution mode: {}", mode); + } + // Set up terminal let backend = CrosstermBackend::new(io::stdout()); let mut terminal = ui::setup_terminal(backend)?; @@ -185,11 +194,34 @@ fn main() -> Result<()> { }; // Collect read paths (GeoIP databases) + #[cfg(not(feature = "kubernetes"))] let read_paths: Vec = GeoIpResolver::get_search_paths() .into_iter() .filter(|p| p.exists()) .collect(); + // When Kubernetes attribution is enabled, the resolver also reads pod + // and container names from the kubelet log directories. /proc is + // already granted below for process lookup; these need explicit read + // access or the periodic metadata refresh would be denied once Landlock + // applies. + #[cfg(feature = "kubernetes")] + let read_paths: Vec = { + let mut paths: Vec = GeoIpResolver::get_search_paths() + .into_iter() + .filter(|p| p.exists()) + .collect(); + if config.kubernetes_mode.enabled() { + for dir in ["/var/log/containers", "/var/log/pods"] { + let pb = PathBuf::from(dir); + if pb.exists() { + paths.push(pb); + } + } + } + paths + }; + let mut write_paths = Vec::new(); // Add logs directory if logging is enabled diff --git a/src/network/kubernetes.rs b/src/network/kubernetes.rs new file mode 100644 index 0000000..25e6c3a --- /dev/null +++ b/src/network/kubernetes.rs @@ -0,0 +1,1000 @@ +// network/kubernetes.rs - Kubernetes pod/container resolution +// +// Parses `/proc//cgroup` to recover the pod UID and container ID of the +// process owning a connection. The pure parser (`parse_cgroup`) is +// cross-platform so it stays unit-testable on non-Linux developer machines; +// the procfs reader (`lookup_for_pid`) is Linux-only. +// +// Recognised cgroup layouts: +// - cgroup v1 systemd: +// `.../kubepods.slice/kubepods-.slice/kubepods--pod.slice/-.scope` +// - cgroup v2 unified: +// `0::/kubepods//pod/` or `.../pod/-.scope` +// - Runtime prefixes stripped from the container ID: `cri-containerd-`, +// `crio-`, `docker-`. Bare 64-hex IDs are also accepted. +// +// Pod UID normalisation: Kubernetes pod UIDs are UUIDs (8-4-4-4-12 hex). systemd +// encodes them with underscores instead of hyphens. The parser yields the +// hyphenated, lowercase canonical form so callers can compare against +// `kubectl get pod ... metadata.uid` directly. + +/// Raw data recovered from a process's cgroup membership before pairing with +/// pod metadata (which lives in `K8sInfo`). The parser only runs on Linux at +/// runtime, but the pure logic is also compiled in tests so it stays +/// exercisable on developer machines. +#[cfg(any(test, target_os = "linux"))] +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CgroupInfo { + pub pod_uid: Option, + pub container_id: Option, + pub cgroup_path: String, +} + +/// Parse the contents of a `/proc//cgroup` file and return the +/// kubepods-related info if any line refers to a Kubernetes pod. Returns +/// `None` for processes that aren't part of a pod cgroup. +#[cfg(any(test, target_os = "linux"))] +pub fn parse_cgroup(contents: &str) -> Option { + let path = contents + .lines() + .map(extract_path) + .find(|p| p.contains("kubepods"))?; + + let pod_uid = path.split('/').find_map(extract_pod_uid); + let container_id = path.rsplit('/').find_map(extract_container_id); + + Some(CgroupInfo { + pod_uid, + container_id, + cgroup_path: path.to_string(), + }) +} + +/// Read `/proc//cgroup` and parse it. Returns `None` if the file is +/// unreadable (PID gone, permissions) or the process isn't in a pod cgroup. +#[cfg(target_os = "linux")] +pub fn lookup_for_pid(pid: u32) -> Option { + let contents = std::fs::read_to_string(format!("/proc/{pid}/cgroup")).ok()?; + parse_cgroup(&contents) +} + +/// Runtime control for Kubernetes pod/container attribution. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum KubernetesMode { + /// Enable only when rustnet is itself running inside a pod (the common + /// `kubectl rustnet` case). A no-op on ordinary hosts, so there is no + /// wasted `/proc` scanning when not in Kubernetes. + #[default] + Auto, + /// Always attempt attribution (e.g. running directly on a node). + On, + /// Never attempt attribution. + Off, +} + +impl KubernetesMode { + /// Parse the `--kubernetes` flag value. Returns `None` for unknown input. + pub fn parse(value: &str) -> Option { + match value.to_ascii_lowercase().as_str() { + "auto" => Some(Self::Auto), + "on" => Some(Self::On), + "off" => Some(Self::Off), + _ => None, + } + } + + /// Whether attribution should run, resolving `Auto` by pod detection. + pub fn enabled(self) -> bool { + match self { + Self::On => true, + Self::Off => false, + Self::Auto => running_in_pod(), + } + } +} + +/// Whether the current process is running inside a Kubernetes pod. Uses the +/// `KUBERNETES_SERVICE_HOST` env var, which the kubelet injects into every pod +/// (the same signal client-go uses for in-cluster detection). This is reliable +/// regardless of cgroup namespacing, unlike inspecting `/proc/self/cgroup`, +/// which a namespaced pod sees as just `/`. +pub fn running_in_pod() -> bool { + std::env::var_os("KUBERNETES_SERVICE_HOST").is_some() +} + +/// Resolves a PID to its pod and container metadata. +/// +/// The cgroup-derived part (pod UID, container ID, cgroup path) is cached per +/// PID since cgroup membership is stable for the life of a PID. Human-readable +/// names come from a separately-refreshed [`PodMetadata`] table and are merged +/// in on every lookup, so a pod's name appears as soon as the next metadata +/// refresh observes it. +pub struct KubernetesResolver { + cache: dashmap::DashMap, + metadata: std::sync::RwLock, +} + +impl KubernetesResolver { + pub fn new() -> Self { + Self { + cache: dashmap::DashMap::new(), + metadata: std::sync::RwLock::new(PodMetadata::load()), + } + } + + /// Look up pod and container info for a process. On non-Linux platforms + /// always returns `None` so callers can stay platform-agnostic. + pub fn enrich(&self, pid: u32) -> Option { + let mut info = if let Some(cached) = self.cache.get(&pid) { + cached.clone() + } else { + let base = self.fetch(pid)?; + self.cache.insert(pid, base.clone()); + base + }; + if let Ok(meta) = self.metadata.read() { + meta.apply(&mut info); + } + Some(info) + } + + /// Reload the on-disk pod metadata table. Cheap (a directory read) and + /// safe to call on the enrichment refresh tick. + pub fn refresh_metadata(&self) { + if let Ok(mut meta) = self.metadata.write() { + *meta = PodMetadata::load(); + } + } + + #[cfg(target_os = "linux")] + fn fetch(&self, pid: u32) -> Option { + let cg = lookup_for_pid(pid)?; + Some(crate::network::types::K8sInfo { + pod_uid: cg.pod_uid, + container_id: cg.container_id, + cgroup_path: Some(cg.cgroup_path), + pod_name: None, + pod_namespace: None, + container_name: None, + }) + } + + #[cfg(not(target_os = "linux"))] + fn fetch(&self, _pid: u32) -> Option { + None + } +} + +impl Default for KubernetesResolver { + fn default() -> Self { + Self::new() + } +} + +/// Pod and container names indexed for enrichment, sourced from the +/// kubelet-managed log directories (runtime-agnostic, no auth, no gRPC): +/// +/// - `/var/log/containers/__-.log` symlinks +/// give container ID -> (pod name, namespace, container name). +/// - `/var/log/pods/__/` directories give pod UID -> +/// (pod name, namespace), used as a fallback when a connection is +/// attributed to a pod's sandbox container (which has no log symlink). +#[derive(Debug, Default)] +pub struct PodMetadata { + by_container_id: std::collections::HashMap, + by_pod_uid: std::collections::HashMap, +} + +#[derive(Debug, Clone)] +struct ContainerMeta { + pod_name: String, + namespace: String, + container_name: String, +} + +#[derive(Debug, Clone)] +struct PodMeta { + pod_name: String, + namespace: String, +} + +impl PodMetadata { + /// Load the metadata table from the kubelet log directories. Linux-only; + /// returns an empty table on other platforms or when the directories are + /// not present (e.g. not running on a node, or the mount is missing). + #[cfg(target_os = "linux")] + pub fn load() -> Self { + let mut by_container_id = std::collections::HashMap::new(); + if let Ok(dir) = std::fs::read_dir("/var/log/containers") { + for entry in dir.flatten() { + let name = entry.file_name(); + let name = match name.to_str() { + Some(s) => s, + None => continue, + }; + if let Some((cid, meta)) = parse_container_log_name(name) { + by_container_id.insert(cid, meta); + } + } + } + + let mut by_pod_uid = std::collections::HashMap::new(); + if let Ok(dir) = std::fs::read_dir("/var/log/pods") { + for entry in dir.flatten() { + let name = entry.file_name(); + let name = match name.to_str() { + Some(s) => s, + None => continue, + }; + if let Some((uid, meta)) = parse_pod_dir_name(name) { + by_pod_uid.insert(uid, meta); + } + } + } + + Self { + by_container_id, + by_pod_uid, + } + } + + #[cfg(not(target_os = "linux"))] + pub fn load() -> Self { + Self::default() + } + + /// Fill in the human-readable name fields of a [`K8sInfo`] from the table. + /// Container-ID match (precise) is preferred; pod-UID match is the fallback + /// for connections attributed to a pod's sandbox container. + fn apply(&self, info: &mut crate::network::types::K8sInfo) { + if let Some(cid) = &info.container_id + && let Some(meta) = self.by_container_id.get(cid) + { + info.pod_name = Some(meta.pod_name.clone()); + info.pod_namespace = Some(meta.namespace.clone()); + info.container_name = Some(meta.container_name.clone()); + return; + } + if info.pod_name.is_none() + && let Some(uid) = &info.pod_uid + && let Some(meta) = self.by_pod_uid.get(uid) + { + info.pod_name = Some(meta.pod_name.clone()); + info.pod_namespace = Some(meta.namespace.clone()); + } + } +} + +/// Parse a `/var/log/containers/` symlink filename of the form +/// `__-.log`. Pod, +/// namespace, and container names are RFC 1123 labels (no underscores), so the +/// first two `_` delimit pod and namespace; the trailing 64-hex container ID is +/// split off the remainder. Returns `(container_id, ContainerMeta)`. +#[cfg(any(test, target_os = "linux"))] +fn parse_container_log_name(filename: &str) -> Option<(String, ContainerMeta)> { + let stem = filename.strip_suffix(".log")?; + let mut parts = stem.splitn(3, '_'); + let pod_name = parts.next()?; + let namespace = parts.next()?; + let rest = parts.next()?; // "-" + let dash = rest.rfind('-')?; + let container_name = &rest[..dash]; + let container_id = &rest[dash + 1..]; + if container_id.len() != 64 || !container_id.chars().all(|c| c.is_ascii_hexdigit()) { + return None; + } + if pod_name.is_empty() || namespace.is_empty() || container_name.is_empty() { + return None; + } + Some(( + container_id.to_string(), + ContainerMeta { + pod_name: pod_name.to_string(), + namespace: namespace.to_string(), + container_name: container_name.to_string(), + }, + )) +} + +/// Parse a `/var/log/pods/` directory name of the form +/// `__`. Returns `(pod_uid, PodMeta)`. +#[cfg(any(test, target_os = "linux"))] +fn parse_pod_dir_name(name: &str) -> Option<(String, PodMeta)> { + let mut parts = name.splitn(3, '_'); + let namespace = parts.next()?; + let pod_name = parts.next()?; + let uid = parts.next()?; + if namespace.is_empty() || pod_name.is_empty() || uid.is_empty() { + return None; + } + Some(( + uid.to_string(), + PodMeta { + pod_name: pod_name.to_string(), + namespace: namespace.to_string(), + }, + )) +} + +// --------------------------------------------------------------------------- +// Per-PID procfs socket table for cross-namespace attribution. +// +// Under `hostNetwork: true`, the standard procfs path reads `/proc/net/tcp` +// from the host network namespace, so it cannot see sockets owned by pods in +// their own netns. The per-PID file `/proc//net/tcp` IS netns-aware: it +// shows the TCP table from the PID's network namespace. By walking +// `/proc//net/{tcp,tcp6,udp,udp6}` for every PID known to live in a +// kubepods cgroup, we get a netns-aware view of all pod sockets on the node. +// +// Cost is modest: a node typically runs a few dozen kubepods PIDs, the files +// are tiny (one line per active socket), and the table is rebuilt only on the +// enrichment refresh tick. +// --------------------------------------------------------------------------- + +/// IP protocol distinction for the socket table key. We track TCP and UDP +/// separately because the same 4-tuple can be reused across protocols. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum SocketProtocol { + Tcp, + Udp, +} + +/// Lookup key matching what `network::types::ConnectionKey` would produce: +/// the connection's 4-tuple plus its transport protocol. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct SocketKey { + pub protocol: SocketProtocol, + pub local: std::net::SocketAddr, + pub remote: std::net::SocketAddr, +} + +/// Built from a sweep of `/proc/*/net/{tcp,tcp6,udp,udp6}` for kubepods PIDs. +/// Provides O(1) lookup by socket 4-tuple to (pid, K8sInfo). +pub struct KubernetesSocketTable { + by_key: std::collections::HashMap, +} + +impl KubernetesSocketTable { + pub fn empty() -> Self { + Self { + by_key: std::collections::HashMap::new(), + } + } + + /// Rebuild the table by walking `/proc/*/cgroup` to find kubepods PIDs and + /// then reading each one's per-PID network tables. Linux-only; the no-op + /// stub on other platforms returns an empty table. + #[cfg(target_os = "linux")] + pub fn build(resolver: &KubernetesResolver) -> Self { + let mut by_key = std::collections::HashMap::new(); + for pid in discover_kubepods_pids() { + let k8s = match resolver.enrich(pid) { + Some(info) => info, + None => continue, + }; + for file in ["tcp", "tcp6", "udp", "udp6"] { + let path = format!("/proc/{pid}/net/{file}"); + let contents = match std::fs::read_to_string(&path) { + Ok(s) => s, + Err(_) => continue, + }; + let proto = if file.starts_with("tcp") { + SocketProtocol::Tcp + } else { + SocketProtocol::Udp + }; + let is_v6 = file.ends_with('6'); + for line in contents.lines().skip(1) { + if let Some(key) = parse_proc_net_line(line, proto, is_v6) { + by_key.entry(key).or_insert_with(|| (pid, k8s.clone())); + } + } + } + } + Self { by_key } + } + + #[cfg(not(target_os = "linux"))] + pub fn build(_resolver: &KubernetesResolver) -> Self { + Self::empty() + } + + /// Look up a (pid, K8sInfo) for the given socket 4-tuple. Returns `None` + /// when the tuple isn't owned by any kubepods PID. + pub fn lookup(&self, key: &SocketKey) -> Option<&(u32, crate::network::types::K8sInfo)> { + self.by_key.get(key) + } + + /// Look up by a rustnet `Connection`, trying both 4-tuple orderings. + /// rustnet's local/remote orientation is derived from observed packet + /// direction and may not match the socket's local/remote orientation, so + /// we probe both. Non-TCP/UDP protocols never have socket-table entries. + pub fn lookup_connection( + &self, + conn: &crate::network::types::Connection, + ) -> Option<(u32, crate::network::types::K8sInfo)> { + use crate::network::types::Protocol; + let protocol = match conn.protocol { + Protocol::Tcp => SocketProtocol::Tcp, + Protocol::Udp => SocketProtocol::Udp, + _ => return None, + }; + let forward = SocketKey { + protocol, + local: conn.local_addr, + remote: conn.remote_addr, + }; + let reverse = SocketKey { + protocol, + local: conn.remote_addr, + remote: conn.local_addr, + }; + self.lookup(&forward) + .or_else(|| self.lookup(&reverse)) + .cloned() + } +} + +/// Read a process's short name from `/proc//comm`. Used to fill in a +/// process name for connections attributed via the socket table (which yields +/// a PID but not a name). Linux-only; returns `None` elsewhere. +#[cfg(target_os = "linux")] +pub fn read_process_name(pid: u32) -> Option { + let comm = std::fs::read_to_string(format!("/proc/{pid}/comm")).ok()?; + let name = comm.trim(); + if name.is_empty() { + None + } else { + Some(name.to_string()) + } +} + +#[cfg(not(target_os = "linux"))] +pub fn read_process_name(_pid: u32) -> Option { + None +} + +/// Enumerate numeric `/proc/` entries and return those whose +/// `/proc//cgroup` mentions `kubepods`. +#[cfg(target_os = "linux")] +fn discover_kubepods_pids() -> Vec { + let mut pids = Vec::new(); + let dir = match std::fs::read_dir("/proc") { + Ok(d) => d, + Err(_) => return pids, + }; + for entry in dir.flatten() { + let name = entry.file_name(); + let name = match name.to_str() { + Some(s) => s, + None => continue, + }; + let pid: u32 = match name.parse() { + Ok(p) => p, + Err(_) => continue, + }; + // Avoid an extra fs::metadata call; rely on /proc//cgroup being + // readable as proof the PID exists and is accessible to us. + if let Ok(cg) = std::fs::read_to_string(format!("/proc/{pid}/cgroup")) + && cg.contains("kubepods") + { + pids.push(pid); + } + } + pids +} + +/// Parse a single line of `/proc//net/{tcp,tcp6,udp,udp6}`. The format +/// looks like: +/// +/// sl local_address rem_address st tx_queue:rx_queue tr:tm->when retrnsmt uid timeout inode ... +/// +/// Address columns are hex-encoded. IPv4 is 8 hex chars representing a u32 in +/// host byte order (the kernel printed the `__be32` value with `%08X`, so on +/// little-endian boxes the byte order is reversed). IPv6 is 32 hex chars +/// representing four u32s, each printed in host byte order. +#[cfg(any(test, target_os = "linux"))] +pub fn parse_proc_net_line(line: &str, protocol: SocketProtocol, is_v6: bool) -> Option { + let mut fields = line.split_whitespace(); + // Skip the "sl" column. + fields.next()?; + let local_raw = fields.next()?; + let remote_raw = fields.next()?; + let local = parse_addr_port(local_raw, is_v6)?; + let remote = parse_addr_port(remote_raw, is_v6)?; + Some(SocketKey { + protocol, + local, + remote, + }) +} + +#[cfg(any(test, target_os = "linux"))] +fn parse_addr_port(field: &str, is_v6: bool) -> Option { + let (ip_hex, port_hex) = field.split_once(':')?; + let port = u16::from_str_radix(port_hex, 16).ok()?; + if is_v6 { + let ip = parse_ipv6_hex(ip_hex)?; + // Sockets bound to v6 may carry IPv4-mapped addresses (`::ffff:1.2.3.4`) + // for incoming v4 traffic. Normalise so the caller's 4-tuple match + // works whether the wire frame was v4 or v6. + let v4: Option = ip.to_ipv4_mapped(); + let addr = match v4 { + Some(v4) => std::net::IpAddr::V4(v4), + None => std::net::IpAddr::V6(ip), + }; + Some(std::net::SocketAddr::new(addr, port)) + } else { + let ip = parse_ipv4_hex(ip_hex)?; + Some(std::net::SocketAddr::new(std::net::IpAddr::V4(ip), port)) + } +} + +#[cfg(any(test, target_os = "linux"))] +fn parse_ipv4_hex(s: &str) -> Option { + if s.len() != 8 { + return None; + } + let val = u32::from_str_radix(s, 16).ok()?; + // The 8-hex-char field is `printf("%08X", be32)` on the wire. On a + // little-endian host that means the byte order is reversed relative to the + // network-order IP. Recover the original 4 bytes via `to_le_bytes`. + Some(std::net::Ipv4Addr::from(val.to_le_bytes())) +} + +#[cfg(any(test, target_os = "linux"))] +fn parse_ipv6_hex(s: &str) -> Option { + if s.len() != 32 { + return None; + } + let mut bytes = [0u8; 16]; + for i in 0..4 { + let chunk = &s[i * 8..(i + 1) * 8]; + let val = u32::from_str_radix(chunk, 16).ok()?; + bytes[i * 4..(i + 1) * 4].copy_from_slice(&val.to_le_bytes()); + } + Some(std::net::Ipv6Addr::from(bytes)) +} + +/// `/proc//cgroup` lines look like `12:cpu,cpuacct:/some/path` for v1 +/// and `0::/some/path` for v2. Return just the path. +#[cfg(any(test, target_os = "linux"))] +fn extract_path(line: &str) -> &str { + // Skip the first two colon-separated fields. + line.splitn(3, ':').nth(2).unwrap_or(line) +} + +/// Extract a pod UID from a single path segment. Accepts: +/// - `pod` (raw, e.g. v2 layout) +/// - `kubepods--pod.slice` and similar systemd-encoded forms +#[cfg(any(test, target_os = "linux"))] +fn extract_pod_uid(segment: &str) -> Option { + // The systemd-encoded slice form ("kubepods-besteffort-pod.slice") + // contains two occurrences of "pod" — the one inside "kubepods" and the + // marker before the UID. The UID always follows the last occurrence. + let after_pod = &segment[segment.rfind("pod")? + 3..]; + let candidate = after_pod + .trim_end_matches(".slice") + .trim_end_matches(".scope"); + canonicalize_uid(candidate) +} + +/// Normalise `123e4567_e89b_12d3_a456_426614174000` or +/// `123e4567-e89b-12d3-a456-426614174000` to the canonical hyphenated form. +/// Returns `None` if the input isn't a recognisable UUID. +#[cfg(any(test, target_os = "linux"))] +fn canonicalize_uid(raw: &str) -> Option { + let normalised: String = raw + .chars() + .map(|c| if c == '_' { '-' } else { c }) + .collect(); + let lower = normalised.to_ascii_lowercase(); + + // Standard UUID form: 8-4-4-4-12 hex with hyphens. + if lower.len() == 36 && is_canonical_uuid(&lower) { + return Some(lower); + } + + // Bare 32-hex form without separators: re-insert hyphens. + if lower.len() == 32 && lower.chars().all(|c| c.is_ascii_hexdigit()) { + return Some(format!( + "{}-{}-{}-{}-{}", + &lower[0..8], + &lower[8..12], + &lower[12..16], + &lower[16..20], + &lower[20..32], + )); + } + + None +} + +#[cfg(any(test, target_os = "linux"))] +fn is_canonical_uuid(s: &str) -> bool { + let bytes = s.as_bytes(); + if bytes.len() != 36 { + return false; + } + for (i, b) in bytes.iter().enumerate() { + match i { + 8 | 13 | 18 | 23 => { + if *b != b'-' { + return false; + } + } + _ => { + if !b.is_ascii_hexdigit() { + return false; + } + } + } + } + true +} + +/// Extract the container ID from the last segment of a cgroup path. +#[cfg(any(test, target_os = "linux"))] +fn extract_container_id(segment: &str) -> Option { + let trimmed = segment.trim_end_matches(".scope"); + // Strip well-known runtime prefixes. + let candidate = trimmed + .strip_prefix("cri-containerd-") + .or_else(|| trimmed.strip_prefix("containerd-")) + .or_else(|| trimmed.strip_prefix("crio-")) + .or_else(|| trimmed.strip_prefix("docker-")) + .unwrap_or(trimmed); + + let candidate = candidate.to_ascii_lowercase(); + if candidate.len() >= 32 && candidate.chars().all(|c| c.is_ascii_hexdigit()) { + Some(candidate) + } else { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + // 1. cgroup v1 systemd, containerd runtime (typical EKS / kind) + #[test] + fn parses_cgroup_v1_systemd_containerd() { + let line = "12:cpu,cpuacct:/kubepods.slice/kubepods-besteffort.slice/\ + kubepods-besteffort-pod123e4567_e89b_12d3_a456_426614174000.slice/\ + cri-containerd-abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789.scope"; + let info = parse_cgroup(line).expect("recognised"); + assert_eq!( + info.pod_uid.as_deref(), + Some("123e4567-e89b-12d3-a456-426614174000") + ); + assert_eq!( + info.container_id.as_deref(), + Some("abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789") + ); + } + + // 2. cgroup v2 unified containerd + #[test] + fn parses_cgroup_v2_unified_containerd() { + let line = "0::/kubepods/burstable/pod123e4567-e89b-12d3-a456-426614174000/\ + abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"; + let info = parse_cgroup(line).expect("recognised"); + assert_eq!( + info.pod_uid.as_deref(), + Some("123e4567-e89b-12d3-a456-426614174000") + ); + assert_eq!( + info.container_id.as_deref(), + Some("abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789") + ); + } + + // 3. cri-o runtime + #[test] + fn parses_crio_scope() { + let line = "0::/kubepods.slice/kubepods-besteffort.slice/\ + kubepods-besteffort-pod123e4567_e89b_12d3_a456_426614174000.slice/\ + crio-abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789.scope"; + let info = parse_cgroup(line).expect("recognised"); + assert_eq!( + info.pod_uid.as_deref(), + Some("123e4567-e89b-12d3-a456-426614174000") + ); + assert_eq!( + info.container_id.as_deref(), + Some("abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789") + ); + } + + // 4. legacy docker shim + #[test] + fn parses_docker_shim_legacy() { + let line = "11:devices:/kubepods/burstable/\ + pod123e4567-e89b-12d3-a456-426614174000/\ + docker-abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789.scope"; + let info = parse_cgroup(line).expect("recognised"); + assert_eq!( + info.pod_uid.as_deref(), + Some("123e4567-e89b-12d3-a456-426614174000") + ); + assert_eq!( + info.container_id.as_deref(), + Some("abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789") + ); + } + + // 5. non-Kubernetes host returns None + #[test] + fn non_k8s_host_returns_none() { + let contents = "0::/user.slice/user-1000.slice/user@1000.service/app.slice/foo.scope\n"; + assert!(parse_cgroup(contents).is_none()); + } + + // 6. pod UID normalisation: underscored -> hyphenated, also handles bare 32-hex + #[test] + fn normalises_pod_uid_variants() { + // underscores + assert_eq!( + canonicalize_uid("123e4567_e89b_12d3_a456_426614174000"), + Some("123e4567-e89b-12d3-a456-426614174000".to_string()) + ); + // already hyphenated + assert_eq!( + canonicalize_uid("123E4567-E89B-12D3-A456-426614174000"), + Some("123e4567-e89b-12d3-a456-426614174000".to_string()) + ); + // bare 32-hex (rare, but seen in some pod-keyed cgroup forms) + assert_eq!( + canonicalize_uid("123e4567e89b12d3a456426614174000"), + Some("123e4567-e89b-12d3-a456-426614174000".to_string()) + ); + // garbage + assert_eq!(canonicalize_uid("not-a-uuid"), None); + } + + // 7. kind / kubelet-prefixed cgroup as seen from inside a hostPID:true pod + // with a different cgroup namespace. Path is relative (begins with `../`) + // but the kubepods segment is preserved. + #[test] + fn parses_kubelet_prefixed_relative_path() { + let line = "0::/../../kubelet-kubepods-besteffort.slice/\ + kubelet-kubepods-besteffort-podc3b4d893_473e_43c2_8013_8ee2955a4630.slice/\ + cri-containerd-c16c7605305c854d8582a1db3d5bb3c4b6c89a08e914223e9d500682b3fb0b1b.scope"; + let info = parse_cgroup(line).expect("recognised"); + assert_eq!( + info.pod_uid.as_deref(), + Some("c3b4d893-473e-43c2-8013-8ee2955a4630") + ); + assert_eq!( + info.container_id.as_deref(), + Some("c16c7605305c854d8582a1db3d5bb3c4b6c89a08e914223e9d500682b3fb0b1b") + ); + } + + // Multi-line /proc//cgroup picks the kubepods line. + #[test] + fn picks_kubepods_line_among_many() { + let contents = "13:misc:/\n\ + 12:perf_event:/\n\ + 11:cpu,cpuacct:/kubepods.slice/kubepods-besteffort.slice/\ + kubepods-besteffort-pod123e4567_e89b_12d3_a456_426614174000.slice/\ + cri-containerd-abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789.scope\n\ + 0::/system.slice/garbage.scope\n"; + let info = parse_cgroup(contents).expect("recognised"); + assert_eq!( + info.pod_uid.as_deref(), + Some("123e4567-e89b-12d3-a456-426614174000") + ); + } + + // --- /proc//net/* line parsing ---------------------------------- + + use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; + + #[test] + fn ipv4_hex_is_little_endian() { + // 0100007F => 127.0.0.1 (kernel printed the be32 with %08X on LE host) + assert_eq!( + parse_ipv4_hex("0100007F"), + Some(Ipv4Addr::new(127, 0, 0, 1)) + ); + // 0F02000A => 10.0.2.15 + assert_eq!( + parse_ipv4_hex("0F02000A"), + Some(Ipv4Addr::new(10, 0, 2, 15)) + ); + // 00000000 => 0.0.0.0 (listening wildcard) + assert_eq!(parse_ipv4_hex("00000000"), Some(Ipv4Addr::UNSPECIFIED)); + // wrong length + assert_eq!(parse_ipv4_hex("0100"), None); + } + + #[test] + fn ipv6_hex_chunked_little_endian() { + // ::1 loopback + assert_eq!( + parse_ipv6_hex("00000000000000000000000001000000"), + Some(Ipv6Addr::LOCALHOST) + ); + // all zeros => :: + assert_eq!( + parse_ipv6_hex("00000000000000000000000000000000"), + Some(Ipv6Addr::UNSPECIFIED) + ); + // wrong length + assert_eq!(parse_ipv6_hex("00"), None); + } + + #[test] + fn parses_tcp_v4_line() { + // Real-shape line: listening on 0.0.0.0:8080 (1F90), no remote. + let line = " 0: 00000000:1F90 00000000:0000 0A 00000000:00000000 00:00000000 00000000 1000 0 12345 1 ..."; + let key = parse_proc_net_line(line, SocketProtocol::Tcp, false).expect("parsed"); + assert_eq!(key.protocol, SocketProtocol::Tcp); + assert_eq!( + key.local, + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0x1F90) + ); + assert_eq!( + key.remote, + SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0) + ); + } + + #[test] + fn parses_tcp_v4_established_line() { + // 127.0.0.1:51000 -> 127.0.0.1:443 + let line = " 1: 0100007F:C738 0100007F:01BB 01 00000000:00000000 00:00000000 00000000 1000 0 99999 1 ..."; + let key = parse_proc_net_line(line, SocketProtocol::Tcp, false).expect("parsed"); + assert_eq!( + key.local, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0xC738) + ); + assert_eq!( + key.remote, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 443) + ); + } + + #[test] + fn parses_udp_v4_line() { + let line = " 2: 0F02000A:0035 00000000:0000 07 00000000:00000000 00:00000000 00000000 101 0 54321 2 ..."; + let key = parse_proc_net_line(line, SocketProtocol::Udp, false).expect("parsed"); + assert_eq!(key.protocol, SocketProtocol::Udp); + assert_eq!( + key.local, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 2, 15)), 0x35) + ); + } + + #[test] + fn parses_tcp_v6_line() { + // ::1:8080 listening + let line = " 0: 00000000000000000000000001000000:1F90 \ + 00000000000000000000000000000000:0000 0A 00000000:00000000 00:00000000 00000000 1000 0 12345 1 ..."; + let key = parse_proc_net_line(line, SocketProtocol::Tcp, true).expect("parsed"); + assert_eq!( + key.local, + SocketAddr::new(IpAddr::V6(Ipv6Addr::LOCALHOST), 0x1F90) + ); + } + + #[test] + fn v6_line_with_ipv4_mapped_address_normalises_to_v4() { + // ::ffff:10.0.2.15 — an IPv4-mapped v6 socket. s6_addr32 = [0,0,0xFFFF0000, + // 0x0F02000A] printed in host order: 00000000 00000000 0000FFFF 0F02000A + let line = " 0: 0000000000000000FFFF00000F02000A:0050 \ + 00000000000000000000000000000000:0000 0A 00000000:00000000 00:00000000 00000000 1000 0 1 1 ..."; + let key = parse_proc_net_line(line, SocketProtocol::Tcp, true).expect("parsed"); + assert_eq!( + key.local, + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(10, 0, 2, 15)), 80) + ); + } + + #[test] + fn rejects_malformed_lines() { + assert!(parse_proc_net_line("garbage", SocketProtocol::Tcp, false).is_none()); + assert!( + parse_proc_net_line(" 0: nocolon 00000000:0000", SocketProtocol::Tcp, false).is_none() + ); + assert!(parse_proc_net_line("", SocketProtocol::Tcp, false).is_none()); + } + + // --- kubelet log directory metadata parsing -------------------------- + + #[test] + fn parses_container_log_symlink_name() { + let name = "coredns-7d764666f9-c9hxr_kube-system_coredns-\ + 2212d876c8edeb3216424f078dc37475050b23a09c601bdcf6e55bc06f1e0bbc.log"; + let (cid, meta) = parse_container_log_name(name).expect("parsed"); + assert_eq!( + cid, + "2212d876c8edeb3216424f078dc37475050b23a09c601bdcf6e55bc06f1e0bbc" + ); + assert_eq!(meta.pod_name, "coredns-7d764666f9-c9hxr"); + assert_eq!(meta.namespace, "kube-system"); + assert_eq!(meta.container_name, "coredns"); + } + + #[test] + fn parses_container_log_with_dashed_container_name() { + // Container names may contain dashes; the 64-hex ID is split off last. + let name = "my-app-7d764666f9-abcde_default_side-car-\ + abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789.log"; + let (cid, meta) = parse_container_log_name(name).expect("parsed"); + assert_eq!( + cid, + "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789" + ); + assert_eq!(meta.pod_name, "my-app-7d764666f9-abcde"); + assert_eq!(meta.namespace, "default"); + assert_eq!(meta.container_name, "side-car"); + } + + #[test] + fn rejects_bad_container_log_names() { + // Missing .log + assert!(parse_container_log_name("foo_bar_baz-deadbeef").is_none()); + // Container ID not 64 hex + assert!(parse_container_log_name("pod_ns_ctr-shortid.log").is_none()); + // Too few underscore-separated fields + assert!(parse_container_log_name("podonly.log").is_none()); + } + + #[test] + fn parses_pod_dir_name() { + let (uid, meta) = super::parse_pod_dir_name( + "demo-traffic_nginx-86644db9cc-mf5lx_c3b4d893-473e-43c2-8013-8ee2955a4630", + ) + .expect("parsed"); + assert_eq!(uid, "c3b4d893-473e-43c2-8013-8ee2955a4630"); + assert_eq!(meta.pod_name, "nginx-86644db9cc-mf5lx"); + assert_eq!(meta.namespace, "demo-traffic"); + } + + #[test] + fn metadata_apply_prefers_container_id_then_pod_uid() { + use crate::network::types::K8sInfo; + let mut meta = PodMetadata::default(); + meta.by_container_id.insert( + "cid64".to_string(), + ContainerMeta { + pod_name: "web-1".to_string(), + namespace: "shop".to_string(), + container_name: "nginx".to_string(), + }, + ); + meta.by_pod_uid.insert( + "uid-1".to_string(), + PodMeta { + pod_name: "web-1".to_string(), + namespace: "shop".to_string(), + }, + ); + + // Container-ID hit fills all three name fields. + let mut info = K8sInfo { + container_id: Some("cid64".to_string()), + pod_uid: Some("uid-1".to_string()), + ..Default::default() + }; + meta.apply(&mut info); + assert_eq!(info.pod_name.as_deref(), Some("web-1")); + assert_eq!(info.pod_namespace.as_deref(), Some("shop")); + assert_eq!(info.container_name.as_deref(), Some("nginx")); + + // Sandbox container (no container-ID match) falls back to pod UID; + // container name stays None. + let mut sandbox = K8sInfo { + container_id: Some("unknown-sandbox-id".to_string()), + pod_uid: Some("uid-1".to_string()), + ..Default::default() + }; + meta.apply(&mut sandbox); + assert_eq!(sandbox.pod_name.as_deref(), Some("web-1")); + assert_eq!(sandbox.pod_namespace.as_deref(), Some("shop")); + assert_eq!(sandbox.container_name, None); + } +} diff --git a/src/network/mod.rs b/src/network/mod.rs index adc2b47..4ac7706 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -11,6 +11,8 @@ pub mod dns; pub mod dpi; pub mod geoip; pub mod interface_stats; +#[cfg(feature = "kubernetes")] +pub mod kubernetes; pub mod link_layer; pub mod merge; pub mod oui; diff --git a/src/network/types.rs b/src/network/types.rs index e36a3b0..edee4d4 100644 --- a/src/network/types.rs +++ b/src/network/types.rs @@ -1848,6 +1848,25 @@ fn smooth_rate(raw: f64, prev: f64) -> f64 { } } +/// Kubernetes pod and container metadata attached to a connection when the +/// owning process is part of a pod on the current node. Populated by the +/// resolver in `network::kubernetes`; `None` when rustnet is not running +/// inside (or with visibility into) a Kubernetes node. +/// +/// `pod_uid`, `container_id`, and `cgroup_path` come from `/proc//cgroup`. +/// The human-readable `pod_name`, `pod_namespace`, and `container_name` are +/// resolved from the on-disk kubelet pods directory when available. +#[cfg(feature = "kubernetes")] +#[derive(Debug, Clone, Default)] +pub struct K8sInfo { + pub pod_uid: Option, + pub pod_name: Option, + pub pod_namespace: Option, + pub container_id: Option, + pub container_name: Option, + pub cgroup_path: Option, +} + #[derive(Debug, Clone)] pub struct Connection { // Core identification @@ -1862,6 +1881,10 @@ pub struct Connection { pub pid: Option, pub process_name: Option, + // Kubernetes attribution (pod/container), populated on K8s nodes + #[cfg(feature = "kubernetes")] + pub k8s_info: Option, + // Connection direction: true = outgoing (local initiated), false = incoming (remote initiated) // Only set for TCP when we observe the handshake (SYN/SYN+ACK), None otherwise pub connection_direction: Option, @@ -1926,6 +1949,8 @@ impl Connection { protocol_state: state, pid: None, process_name: None, + #[cfg(feature = "kubernetes")] + k8s_info: None, connection_direction: None, bytes_sent: 0, bytes_received: 0, diff --git a/src/ui/mod.rs b/src/ui/mod.rs index 0aa1e19..0ca355d 100644 --- a/src/ui/mod.rs +++ b/src/ui/mod.rs @@ -753,6 +753,8 @@ mod snapshot_tests { geoip_asn_path: None, geoip_city_path: None, disable_geoip: true, + #[cfg(feature = "kubernetes")] + kubernetes_mode: crate::network::kubernetes::KubernetesMode::default(), } } diff --git a/src/ui/tabs/details.rs b/src/ui/tabs/details.rs index 8c977ed..178e0fb 100644 --- a/src/ui/tabs/details.rs +++ b/src/ui/tabs/details.rs @@ -415,6 +415,73 @@ pub(in crate::ui) fn draw_connection_details( } } + // Kubernetes attribution (pod / container) when the owning process is in + // a kubepods cgroup. Only rendered when the resolver populated `k8s_info`. + #[cfg(feature = "kubernetes")] + if let Some(ref k8s) = conn.k8s_info { + let k8s_value_style = theme::fg(theme::field_process()); + push_detail_section(&mut details_text, &mut detail_fields, "Kubernetes"); + // Prefer the human-readable pod name over the raw UID; show both when + // both are present. + if let Some(ref name) = k8s.pod_name { + let pod_display = if let Some(ref ns) = k8s.pod_namespace { + format!("{}/{}", ns, name) + } else { + name.clone() + }; + push_detail_field_styled( + &mut details_text, + &mut detail_fields, + "Pod", + pod_display, + label_style, + k8s_value_style, + ); + } + if let Some(ref uid) = k8s.pod_uid { + push_detail_field_styled( + &mut details_text, + &mut detail_fields, + "Pod UID", + uid.clone(), + label_style, + k8s_value_style, + ); + } + if let Some(ref cname) = k8s.container_name { + push_detail_field_styled( + &mut details_text, + &mut detail_fields, + "Container", + cname.clone(), + label_style, + k8s_value_style, + ); + } + if let Some(ref cid) = k8s.container_id { + // Container IDs are 64 hex chars; truncate to the short form + // typically shown by `kubectl get pod ... -o wide`. + let short = if cid.len() >= 12 { &cid[..12] } else { cid }; + push_detail_field_styled( + &mut details_text, + &mut detail_fields, + "Container ID", + short.to_string(), + label_style, + k8s_value_style, + ); + } + if let Some(ref path) = k8s.cgroup_path { + push_detail_field( + &mut details_text, + &mut detail_fields, + "Cgroup", + path.clone(), + label_style, + ); + } + } + // Add DPI / application protocol information. Section heading carries // both the label and the protocol so we don't need a redundant // "Application: " field below. diff --git a/tests/kubernetes_resolver_live.rs b/tests/kubernetes_resolver_live.rs new file mode 100644 index 0000000..04e16af --- /dev/null +++ b/tests/kubernetes_resolver_live.rs @@ -0,0 +1,55 @@ +//! Live integration test for the Kubernetes resolver. +//! +//! Runs only on Linux with the `kubernetes` feature enabled, and only when the +//! current process is inside a kubepods cgroup. On any other host the test +//! reports `Ok` after a single quick probe so CI on non-K8s machines stays +//! green. The intended invocation is from a debug pod on a Kubernetes node: +//! +//! cargo test --features kubernetes --test kubernetes_resolver_live -- --nocapture + +#![cfg(all(target_os = "linux", feature = "kubernetes"))] + +use rustnet_monitor::network::kubernetes::{KubernetesResolver, lookup_for_pid}; + +#[test] +fn resolver_picks_up_pod_uid_for_current_process() { + let cg = std::fs::read_to_string("/proc/self/cgroup").expect("read /proc/self/cgroup"); + eprintln!("===/proc/self/cgroup===\n{cg}"); + + if !cg.contains("kubepods") { + eprintln!( + "Not inside a kubepods cgroup — skipping live assertion. This is expected on non-Kubernetes hosts." + ); + return; + } + + let pid = std::process::id(); + let cgroup = lookup_for_pid(pid).expect("parser recognised current kubepods cgroup"); + eprintln!("===CgroupInfo for pid {pid}===\n{cgroup:?}"); + assert!( + cgroup.pod_uid.is_some(), + "pod_uid should be populated for a kubepods process" + ); + assert!( + cgroup.container_id.is_some(), + "container_id should be populated for a kubepods process" + ); + + let resolver = KubernetesResolver::new(); + let info = resolver + .enrich(pid) + .expect("resolver returned None for current pid inside kubepods"); + eprintln!("===K8sInfo===\n{info:?}"); + assert!(info.pod_uid.is_some(), "K8sInfo.pod_uid populated"); + assert!( + info.container_id.is_some(), + "K8sInfo.container_id populated" + ); + assert!(info.cgroup_path.is_some(), "K8sInfo.cgroup_path populated"); + + // Second call should hit the in-memory cache. + let cached = resolver + .enrich(pid) + .expect("cached lookup returned None unexpectedly"); + assert_eq!(cached.pod_uid, info.pod_uid); +}