Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ jobs:
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
# Enable Kubernetes pod/container attribution in the container image.
# It is dependency-free and runtime-gated (`--kubernetes auto`, the
# default, is inert outside a pod), so a single image serves both
# general container use and kubectl-rustnet. Native installs
# (cargo/brew/deb/rpm) leave the feature off and stay lean.
build-args: |
CARGO_FEATURES=kubernetes
cache-from: type=gha
cache-to: type=gha,mode=max

Expand Down
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ linux-default = ["ebpf"] # Deprecated: kept for backwards compatibility
ebpf = ["libbpf-rs", "dep:libbpf-cargo"]
landlock = ["dep:landlock", "dep:caps"]
macos-sandbox = []
# Kubernetes pod and container resolution from cgroup paths and (Phase 2) the
# on-disk kubelet pods directory. Linux-only at runtime; no extra dependencies.
kubernetes = []

# Minimal cross configuration to override dependency conflicts
[workspace.metadata.cross.build.env]
Expand Down
12 changes: 10 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@ COPY benches ./benches
COPY assets/services ./assets/services
COPY assets/oui.gz ./assets/oui.gz

# Build the application in release mode (eBPF is enabled by default on Linux)
RUN cargo build --release
# Build the application in release mode (eBPF is enabled by default on Linux).
# Optional features can be added with --build-arg CARGO_FEATURES=kubernetes
# (additive: default features stay on). The CI Kubernetes image variant passes
# CARGO_FEATURES=kubernetes; the default image leaves it empty.
ARG CARGO_FEATURES=""
RUN if [ -n "$CARGO_FEATURES" ]; then \
cargo build --release --features "$CARGO_FEATURES"; \
else \
cargo build --release; \
fi

# Runtime stage - use trixie-slim to match GLIBC version from builder
FROM debian:trixie-slim
Expand Down
1 change: 1 addition & 0 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ The experimental eBPF support provides efficient process identification but has
- **Real-time Process Updates**: Track process name changes and executable updates
- **Container Support**: Better process identification within containerized environments
- **Security Context**: Include process security attributes (capabilities, SELinux context, etc.)
- **Cross-Namespace Attribution for Kubernetes**: The current procfs fallback reads `/proc/net/tcp` from the reader's network namespace, so under `hostNetwork: true` (as used by kubectl-rustnet) it never sees sockets owned by pods in their own netns. The kubernetes feature ships a scoped per-PID `/proc/<pid>/net/{tcp,tcp6,udp,udp6}` walker that covers TCP+UDP for kubepods PIDs, but it ticks at the enrichment interval and so misses sub-tick ephemeral flows. The complete fix lives in the eBPF layer: kprobes/fentry are netns-agnostic and fire at `connect()`/`accept()` time, but the current socket-tracker map is being pruned more aggressively than userspace can consume. Plan: extend map retention (or switch to a ring buffer of close events that userspace drains opportunistically), debug the "Map Lookup Miss" path under Kubernetes traffic patterns, and verify cross-namespace coverage end-to-end in a kind cluster. This work also benefits ICMP and raw-socket attribution, which procfs cannot reach.

## Features

Expand Down
54 changes: 54 additions & 0 deletions examples/k8s_resolver_check.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
//! Smoke-test binary: takes a PID on the command line, runs the Kubernetes
//! resolver against `/proc/<pid>/cgroup`, and prints the resolver's view.
//! Intended for running inside a `hostPID: true` pod on a Kubernetes node:
//!
//! cargo build --release --features kubernetes --example k8s_resolver_check
//! ./k8s_resolver_check $(pidof nginx | awk '{print $1}')

#[cfg(feature = "kubernetes")]
fn main() {
let pid: u32 = std::env::args()
.nth(1)
.expect("usage: k8s_resolver_check <pid>")
.parse()
.expect("PID must be an integer");

let path = format!("/proc/{pid}/cgroup");
let raw = std::fs::read_to_string(&path).unwrap_or_else(|e| {
eprintln!("read {path}: {e}");
std::process::exit(2);
});
println!("--- /proc/{pid}/cgroup ---");
println!("{raw}");

let resolver = rustnet_monitor::network::kubernetes::KubernetesResolver::new();
match resolver.enrich(pid) {
Some(info) => {
println!("--- resolver output ---");
println!("{info:#?}");
// Mirror the JSON shape that log_connection_event would emit.
let mut obj = serde_json::Map::new();
if let Some(v) = info.pod_uid {
obj.insert("pod_uid".into(), serde_json::json!(v));
}
if let Some(v) = info.container_id {
obj.insert("container_id".into(), serde_json::json!(v));
}
if let Some(v) = info.cgroup_path {
obj.insert("cgroup_path".into(), serde_json::json!(v));
}
println!("--- JSONL 'kubernetes' block ---");
println!("{}", serde_json::to_string(&obj).unwrap());
}
None => {
println!("--- resolver returned None (no kubepods cgroup) ---");
std::process::exit(1);
}
}
}

#[cfg(not(feature = "kubernetes"))]
fn main() {
eprintln!("Build with --features kubernetes");
std::process::exit(2);
}
121 changes: 121 additions & 0 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,33 @@ fn log_connection_event(
event["process_name"] = json!(process_name);
}

// Add Kubernetes attribution if the process is part of a pod
#[cfg(feature = "kubernetes")]
if let Some(k8s) = &conn.k8s_info {
let mut obj = serde_json::Map::new();
if let Some(v) = &k8s.pod_uid {
obj.insert("pod_uid".into(), json!(v));
}
if let Some(v) = &k8s.pod_name {
obj.insert("pod_name".into(), json!(v));
}
if let Some(v) = &k8s.pod_namespace {
obj.insert("pod_namespace".into(), json!(v));
}
if let Some(v) = &k8s.container_id {
obj.insert("container_id".into(), json!(v));
}
if let Some(v) = &k8s.container_name {
obj.insert("container_name".into(), json!(v));
}
if let Some(v) = &k8s.cgroup_path {
obj.insert("cgroup_path".into(), json!(v));
}
if !obj.is_empty() {
event["kubernetes"] = serde_json::Value::Object(obj);
}
}

// Add service name if available
if let Some(service_name) = &conn.service_name {
event["service_name"] = json!(service_name);
Expand Down Expand Up @@ -306,6 +333,33 @@ fn log_pcap_connection(pcap_path: &str, conn: &Connection) {
"state": conn.state(),
});

// Add Kubernetes attribution if the process is part of a pod
#[cfg(feature = "kubernetes")]
if let Some(k8s) = &conn.k8s_info {
let mut obj = serde_json::Map::new();
if let Some(v) = &k8s.pod_uid {
obj.insert("pod_uid".into(), json!(v));
}
if let Some(v) = &k8s.pod_name {
obj.insert("pod_name".into(), json!(v));
}
if let Some(v) = &k8s.pod_namespace {
obj.insert("pod_namespace".into(), json!(v));
}
if let Some(v) = &k8s.container_id {
obj.insert("container_id".into(), json!(v));
}
if let Some(v) = &k8s.container_name {
obj.insert("container_name".into(), json!(v));
}
if let Some(v) = &k8s.cgroup_path {
obj.insert("cgroup_path".into(), json!(v));
}
if !obj.is_empty() {
event["kubernetes"] = serde_json::Value::Object(obj);
}
}

// Only add GeoIP fields when they have actual values
if let Some(ref geoip) = conn.geoip_info {
if let Some(ref cc) = geoip.country_code {
Expand Down Expand Up @@ -364,6 +418,9 @@ pub struct Config {
pub geoip_city_path: Option<String>,
/// Disable GeoIP lookups entirely
pub disable_geoip: bool,
/// Kubernetes pod/container attribution mode
#[cfg(feature = "kubernetes")]
pub kubernetes_mode: crate::network::kubernetes::KubernetesMode,
}

impl Default for Config {
Expand All @@ -382,6 +439,8 @@ impl Default for Config {
geoip_asn_path: None,
geoip_city_path: None,
disable_geoip: false,
#[cfg(feature = "kubernetes")]
kubernetes_mode: crate::network::kubernetes::KubernetesMode::default(),
}
}
}
Expand Down Expand Up @@ -1036,6 +1095,8 @@ impl App {
let pktap_active = Arc::clone(&self.pktap_active);
let should_stop = Arc::clone(&self.should_stop);
let process_detection_status = Arc::clone(&self.process_detection_status);
#[cfg(feature = "kubernetes")]
let kubernetes_mode = self.config.kubernetes_mode;

thread::Builder::new()
.name("process-enrichment".to_string())
Expand Down Expand Up @@ -1089,6 +1150,8 @@ impl App {
pktap_active,
process_detection_status,
process_ready_tx,
#[cfg(feature = "kubernetes")]
kubernetes_mode,
) {
error!("Process enrichment thread failed: {}", e);
}
Expand All @@ -1105,6 +1168,7 @@ impl App {
pktap_active: Arc<AtomicBool>,
process_detection_status: Arc<RwLock<ProcessDetectionStatus>>,
process_ready_tx: std::sync::mpsc::SyncSender<()>,
#[cfg(feature = "kubernetes")] kubernetes_mode: crate::network::kubernetes::KubernetesMode,
) -> Result<()> {
use crate::network::platform::DegradationReason;

Expand All @@ -1113,6 +1177,25 @@ impl App {

let process_lookup = create_process_lookup(use_pktap)?;

// Kubernetes pod/container attribution. `auto` enables only when rustnet
// is itself running inside a pod, so the resolver and the cross-namespace
// socket table are only built when enabled and non-Kubernetes hosts do
// no extra /proc work. The table stays empty when disabled.
#[cfg(feature = "kubernetes")]
let kubernetes_resolver = kubernetes_mode
.enabled()
.then(crate::network::kubernetes::KubernetesResolver::new);
#[cfg(feature = "kubernetes")]
if kubernetes_resolver.is_some() {
info!("Kubernetes pod/container attribution enabled");
}
#[cfg(feature = "kubernetes")]
let mut k8s_socket_table = crate::network::kubernetes::KubernetesSocketTable::empty();
#[cfg(feature = "kubernetes")]
if let Some(resolver) = &kubernetes_resolver {
k8s_socket_table = crate::network::kubernetes::KubernetesSocketTable::build(resolver);
}

// Signal that process detection (including eBPF loading) is complete.
// The main thread waits for this before dropping eBPF capabilities.
let _ = process_ready_tx.send(());
Expand Down Expand Up @@ -1163,6 +1246,14 @@ impl App {
if let Err(e) = process_lookup.refresh() {
debug!("Process lookup refresh failed: {}", e);
}
// Refresh pod-name metadata and rebuild the cross-namespace
// socket table on the same cadence.
#[cfg(feature = "kubernetes")]
if let Some(resolver) = &kubernetes_resolver {
resolver.refresh_metadata();
k8s_socket_table =
crate::network::kubernetes::KubernetesSocketTable::build(resolver);
}
last_refresh = Instant::now();
}

Expand Down Expand Up @@ -1217,6 +1308,36 @@ impl App {
if did_enrich {
enriched += 1;
}

// Look up Kubernetes pod/container metadata for the PID.
// Cheap after the first hit per PID (cached in the resolver).
#[cfg(feature = "kubernetes")]
if let Some(resolver) = &kubernetes_resolver
&& entry.k8s_info.is_none()
&& let Some(k8s) = resolver.enrich(pid)
{
entry.k8s_info = Some(k8s);
}
} else {
// The primary lookup couldn't attribute this connection.
// Under hostNetwork, that includes every pod-owned socket
// living in another network namespace. The socket table
// walks per-PID /proc/<pid>/net/* (netns-aware) for kubepods
// PIDs and matches the 4-tuple, yielding both the PID and
// its pod/container metadata.
#[cfg(feature = "kubernetes")]
if entry.pid.is_none()
&& let Some((pid, k8s)) = k8s_socket_table.lookup_connection(&entry)
{
entry.pid = Some(pid);
if entry.process_name.is_none() {
entry.process_name = crate::network::kubernetes::read_process_name(pid);
}
if entry.k8s_info.is_none() {
entry.k8s_info = Some(k8s);
}
enriched += 1;
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,19 @@ pub fn build_cli() -> Command {
.action(clap::ArgAction::SetTrue),
);

#[cfg(feature = "kubernetes")]
let cmd = cmd.arg(
Arg::new("kubernetes")
.long("kubernetes")
.value_name("MODE")
.help(
"Kubernetes pod/container attribution: \"auto\" (enable only when running inside a pod), \"on\" (always), or \"off\"",
)
.value_parser(["auto", "on", "off"])
.default_value("auto")
.required(false),
);

#[cfg(any(
target_os = "linux",
target_os = "windows",
Expand Down
Loading
Loading