Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions fc-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,26 +64,36 @@ pub async fn run() -> Result<()> {
// Breaks the 30s poll loop when POLLHUP is not delivered after snapshot restore.
let restore_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));

// Exec server rebind signal — shared by restore-epoch watcher and cache-ready handshake.
// Exec server rebind signal — shared by restore-epoch watcher.
// After vsock transport reset, the listener's AsyncFd epoll becomes stale.
// We use BOTH Notify (to wake the select loop) and AtomicBool (to persist the signal).
// tokio::select! can lose Notify permits when accept() and notified() are both Ready
// simultaneously — the AtomicBool flag catches this race.
let exec_rebind = std::sync::Arc::new(tokio::sync::Notify::new());
let exec_rebind_needed = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));

// Exec rebind confirmation — exec server signals after re_register() completes.
// handle_clone_restore waits on this before reconnecting output, ensuring exec is
// ready before the host starts health-checking via exec.
let exec_rebind_done = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let exec_rebind_done_notify = std::sync::Arc::new(tokio::sync::Notify::new());

// Start restore-epoch watcher
let watcher_output = output.clone();
let watcher_restore_flag = restore_flag.clone();
let watcher_exec_rebind = exec_rebind.clone();
let watcher_exec_rebind_needed = exec_rebind_needed.clone();
let watcher_exec_rebind_done = exec_rebind_done.clone();
let watcher_exec_rebind_done_notify = exec_rebind_done_notify.clone();
tokio::spawn(async move {
eprintln!("[fc-agent] starting restore-epoch watcher");
mmds::watch_restore_epoch(
watcher_output,
watcher_restore_flag,
watcher_exec_rebind,
watcher_exec_rebind_needed,
watcher_exec_rebind_done,
watcher_exec_rebind_done_notify,
)
.await;
});
Expand All @@ -92,8 +102,17 @@ pub async fn run() -> Result<()> {
let (exec_ready_tx, exec_ready_rx) = tokio::sync::oneshot::channel();
let exec_rebind_clone = exec_rebind.clone();
let exec_rebind_needed_clone = exec_rebind_needed.clone();
let exec_rebind_done_clone = exec_rebind_done.clone();
let exec_rebind_done_notify_clone = exec_rebind_done_notify.clone();
tokio::spawn(async move {
exec::run_server(exec_ready_tx, exec_rebind_clone, exec_rebind_needed_clone).await;
exec::run_server(
exec_ready_tx,
exec_rebind_clone,
exec_rebind_needed_clone,
exec_rebind_done_clone,
exec_rebind_done_notify_clone,
)
.await;
});

match tokio::time::timeout(Duration::from_secs(5), exec_ready_rx).await {
Expand Down Expand Up @@ -233,12 +252,11 @@ pub async fn run() -> Result<()> {
if container::notify_cache_ready_and_wait(&digest, &restore_flag) {
eprintln!("[fc-agent] cache ready notification acknowledged");
// Pre-start snapshot was taken and we've been restored into a new
// Firecracker instance. The vsock transport was reset, which
// invalidates the exec server's listener socket (stale AsyncFd epoll).
// Signal it to re-bind. Set flag BEFORE notify to prevent race where
// select! drops the Notified future (see exec.rs doc comment).
exec_rebind_needed.store(true, std::sync::atomic::Ordering::Release);
exec_rebind.notify_one();
// Firecracker instance. Exec rebind + output reconnect are handled
// by handle_clone_restore() via the restore-epoch watcher.
// Do NOT signal exec rebind here — a duplicate re_register() corrupts
// the AsyncFd epoll registration, causing health checks to hang for ~60s
// (see plan trace evidence for the smoking gun vsock muxer logs).
} else {
eprintln!("[fc-agent] WARNING: cache-ready handshake failed, continuing");
}
Expand Down
6 changes: 6 additions & 0 deletions fc-agent/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ pub async fn run_server(
ready_tx: tokio::sync::oneshot::Sender<()>,
rebind_signal: Arc<Notify>,
rebind_needed: Arc<AtomicBool>,
rebind_done: Arc<AtomicBool>,
rebind_done_notify: Arc<Notify>,
) {
eprintln!(
"[fc-agent] starting exec server on vsock port {}",
Expand Down Expand Up @@ -64,6 +66,8 @@ pub async fn run_server(
"[fc-agent] exec server: vsock transport reset (flag), re-registering listener"
);
listener = do_re_register(listener).await;
rebind_done.store(true, Ordering::Release);
rebind_done_notify.notify_one();
}

tokio::select! {
Expand All @@ -82,6 +86,8 @@ pub async fn run_server(
rebind_needed.store(false, Ordering::Release);
eprintln!("[fc-agent] exec server: vsock transport reset (notify), re-registering listener");
listener = do_re_register(listener).await;
rebind_done.store(true, Ordering::Release);
rebind_done_notify.notify_one();
}
}
}
Expand Down
6 changes: 6 additions & 0 deletions fc-agent/src/mmds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ pub async fn watch_restore_epoch(
restore_flag: std::sync::Arc<std::sync::atomic::AtomicBool>,
exec_rebind: std::sync::Arc<tokio::sync::Notify>,
exec_rebind_needed: std::sync::Arc<std::sync::atomic::AtomicBool>,
exec_rebind_done: std::sync::Arc<std::sync::atomic::AtomicBool>,
exec_rebind_done_notify: std::sync::Arc<tokio::sync::Notify>,
) {
let mut last_epoch: Option<String> = None;

Expand Down Expand Up @@ -171,6 +173,8 @@ pub async fn watch_restore_epoch(
&output,
&exec_rebind,
&exec_rebind_needed,
&exec_rebind_done,
&exec_rebind_done_notify,
)
.await;
last_epoch = metadata.restore_epoch;
Expand All @@ -182,6 +186,8 @@ pub async fn watch_restore_epoch(
&output,
&exec_rebind,
&exec_rebind_needed,
&exec_rebind_done,
&exec_rebind_done_notify,
)
.await;
last_epoch = metadata.restore_epoch;
Expand Down
40 changes: 31 additions & 9 deletions fc-agent/src/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ use tokio::sync::Notify;
use crate::network;
use crate::output::OutputHandle;

/// Handle clone restore: kill stale sockets, flush ARP, reconnect output + exec.
/// Handle clone restore: kill stale sockets, flush ARP, re-register exec, reconnect output.
///
/// CRITICAL ordering: exec re-register MUST complete before output reconnect.
/// The host uses the output connection as a readiness signal — once connected,
/// it starts the health monitor which calls `fcvm exec`. If exec's AsyncFd epoll
/// is still stale, health checks hang for ~60s (see trace evidence in plan).
///
/// FUSE volumes are NOT remounted here. The reconnectable multiplexer
/// detects the dead vsock and auto-reconnects to the clone's VolumeServer.
Expand All @@ -15,20 +20,37 @@ pub async fn handle_clone_restore(
output: &OutputHandle,
exec_rebind: &Arc<Notify>,
exec_rebind_needed: &Arc<AtomicBool>,
exec_rebind_done: &Arc<AtomicBool>,
exec_rebind_done_notify: &Arc<Notify>,
) {
network::kill_stale_tcp_connections().await;
network::flush_arp_cache().await;
network::send_gratuitous_arp().await;

// Reconnect output vsock (broken by snapshot vsock reset).
// FUSE vsock reconnection is handled automatically by the reconnectable multiplexer.
output.reconnect();

// Re-bind exec server listener (AsyncFd epoll stale after transport reset).
// Set flag BEFORE notify to prevent race where select! drops the Notified future
// (see exec.rs doc comment for detailed explanation).
// FIRST: Re-register exec server listener (AsyncFd epoll stale after transport reset).
// Reset confirmation flag, then signal. Set flag BEFORE notify to prevent race
// where select! drops the Notified future (see exec.rs doc comment).
exec_rebind_done.store(false, Ordering::Release);
exec_rebind_needed.store(true, Ordering::Release);
exec_rebind.notify_one();

eprintln!("[fc-agent] signaled output + exec vsock reconnect after restore");
// SECOND: Wait for exec server to confirm re-register completed.
// This ensures accept() works before the host can reach the exec server.
match tokio::time::timeout(
std::time::Duration::from_secs(5),
exec_rebind_done_notify.notified(),
)
.await
{
Ok(()) => {
eprintln!("[fc-agent] exec re-registered, proceeding to output reconnect")
}
Err(_) => eprintln!("[fc-agent] WARNING: exec re-register timed out (5s)"),
}

// THIRD: Reconnect output vsock (tells host we're alive + exec is ready).
// FUSE vsock reconnection is handled automatically by the reconnectable multiplexer.
output.reconnect();

eprintln!("[fc-agent] signaled exec rebind + output reconnect after restore");
}
6 changes: 5 additions & 1 deletion src/commands/podman/listeners.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub(crate) async fn run_output_listener(
log_tx: Option<tokio::sync::broadcast::Sender<LogLine>>,
reconnect_notify: Arc<tokio::sync::Notify>,
non_blocking_output: bool,
connected_tx: Option<tokio::sync::oneshot::Sender<()>>,
) -> Result<Vec<(String, String)>> {
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::UnixListener;
Expand Down Expand Up @@ -225,6 +226,9 @@ pub(crate) async fn run_output_listener(
};
connection_count += 1;
debug!(vm_id = %vm_id, connection_count, "Output connection established");
if let Some(tx) = connected_tx {
let _ = tx.send(());
}

let mut reader = BufReader::new(initial_stream);
let mut line_buf = String::new();
Expand Down Expand Up @@ -403,7 +407,7 @@ mod tests {
let socket_str = socket_path.to_string_lossy().to_string();
let reconnect = std::sync::Arc::new(tokio::sync::Notify::new());
let handle = tokio::spawn(async move {
run_output_listener(&socket_str, "test-vm", None, reconnect, lossy).await
run_output_listener(&socket_str, "test-vm", None, reconnect, lossy, None).await
});
for _ in 0..50 {
if socket_path.exists() {
Expand Down
1 change: 1 addition & 0 deletions src/commands/podman/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,7 @@ pub async fn prepare_vm(mut args: RunArgs) -> Result<Option<VmContext>> {
log_tx_clone,
reconnect,
non_blocking_output,
None,
)
.await
{
Expand Down
29 changes: 21 additions & 8 deletions src/commands/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,8 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> {
// listener to drop its dead vsock stream and re-accept. Without this, the listener
// stays stuck reading from the old (dead) connection after VM resume resets vsock.
let output_reconnect = Arc::new(tokio::sync::Notify::new());
// Channel to know when fc-agent's output connection arrives (gates health monitor)
let (output_connected_tx, output_connected_rx) = tokio::sync::oneshot::channel();
let output_handle = if !tty_mode {
let socket_path = output_socket_path.clone();
let vm_id_clone = vm_id.clone();
Expand All @@ -612,6 +614,7 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> {
None,
reconnect,
non_blocking_output,
Some(output_connected_tx),
)
.await
{
Expand Down Expand Up @@ -857,14 +860,9 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> {

let (mut vm_manager, mut holder_child) = setup_result.unwrap();

// For startup snapshots (container already running), the output listener has an
// active connection from fc-agent that's now dead after VM resume. Signal it to
// drop the dead stream and re-accept. For pre-start snapshots (container not yet
// started), the listener is fresh with no connection — DON'T notify, or the
// stored permit will poison the first real connection by dropping it immediately.
if args.startup_snapshot_base_key.is_some() {
output_reconnect.notify_one();
}
// fc-agent's handle_clone_restore() now drives the output reconnect sequence:
// exec rebind → wait for confirmation → output.reconnect(). No host-side
// notify needed — the listener will accept fc-agent's new connection naturally.

let is_uffd = use_uffd || std::env::var("FCVM_FORCE_UFFD").is_ok() || hugepages;
if is_uffd {
Expand Down Expand Up @@ -972,6 +970,21 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> {
(None, None)
};

// Wait for fc-agent output connection before starting health monitor.
// This ensures the deterministic handshake chain is complete:
// exec_rebind → exec_re_register → rebind_done → output.reconnect() → HERE
// Without this gate, the health monitor could start exec calls before
// the exec server has re-registered its AsyncFd after restore.
if !tty_mode {
match tokio::time::timeout(std::time::Duration::from_secs(30), output_connected_rx).await {
Ok(Ok(())) => info!(vm_id = %vm_id, "fc-agent output connected, exec server ready"),
Ok(Err(_)) => warn!(vm_id = %vm_id, "output connected_tx dropped"),
Err(_) => {
warn!(vm_id = %vm_id, "fc-agent did not connect within 30s, proceeding anyway")
}
}
}

// Spawn health monitor task with startup snapshot trigger support
let health_monitor_handle = crate::health::spawn_health_monitor_full(
vm_id.clone(),
Expand Down
71 changes: 71 additions & 0 deletions tests/test_localhost_rootless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ async fn test_localhost_rootless_btrfs_snapshot_restore() -> Result<()> {
.context("spawning fcvm (second run)")?;
println!(" fcvm PID: {}", fcvm_pid);

// Pre-start snapshot restore still needs to run `podman run` from scratch
// (container doesn't exist yet), so it needs the same timeout as a fresh boot.
common::poll_health_by_pid(fcvm_pid, 120)
.await
.context("second run (from snapshot): VM failed to become healthy")?;
Expand Down Expand Up @@ -360,13 +362,82 @@ async fn test_localhost_rootless_btrfs_snapshot_restore() -> Result<()> {
driver
);

// Step 4: Exec stress test — fire 10 parallel exec calls with tight timeouts.
// Catches the double-rebind regression where exec hangs for ~150s after restore.
// Each call must complete within 10s (the bug caused 150s hangs).
println!("\n4. Post-restore exec stress test (10 parallel calls, 10s timeout each)...");
let stress_start = std::time::Instant::now();
let mut handles: Vec<tokio::task::JoinHandle<anyhow::Result<std::time::Duration>>> =
Vec::new();
for i in 0..10 {
let handle = tokio::spawn(async move {
let call_start = std::time::Instant::now();
let result = tokio::time::timeout(
tokio::time::Duration::from_secs(10),
common::exec_in_vm(fcvm_pid, &["echo", &format!("stress-{}", i)]),
)
.await;
let elapsed = call_start.elapsed();
match result {
Ok(Ok(output)) => {
let output = output.trim().to_string();
assert_eq!(
output,
format!("stress-{}", i),
"exec call {} returned wrong output",
i
);
println!(
" exec call {} completed in {:.1}ms",
i,
elapsed.as_secs_f64() * 1000.0
);
Ok(elapsed)
}
Ok(Err(e)) => {
panic!("exec call {} failed: {}", i, e);
}
Err(_) => {
panic!(
"exec call {} timed out after 10s — exec server likely broken by double rebind",
i
);
}
}
});
handles.push(handle);
}

let mut max_latency = std::time::Duration::ZERO;
for handle in handles {
let elapsed = handle.await.expect("exec stress task panicked")?;
if elapsed > max_latency {
max_latency = elapsed;
}
}
let total_elapsed = stress_start.elapsed();
println!(
" All 10 exec calls completed in {:.1}ms (max single: {:.1}ms)",
total_elapsed.as_secs_f64() * 1000.0,
max_latency.as_secs_f64() * 1000.0
);
// Sanity check: all 10 parallel calls should finish in under 30s total.
// With the bug, they'd each timeout at 10s = 10s total (parallel), or
// if sequential would be 150s+ total.
assert!(
total_elapsed.as_secs() < 30,
"exec stress test took {}s — expected <30s for 10 parallel calls",
total_elapsed.as_secs()
);

common::kill_process(fcvm_pid).await;
}

println!("\n BTRFS SNAPSHOT RESTORE TEST PASSED");
println!(" - First run: fresh boot, btrfs image, snapshot created");
println!(" - Second run: restored from snapshot, btrfs image isolated per-clone");
println!(" - Username preserved across snapshot for health checks");
println!(" - Post-restore exec stress: 10 parallel calls completed within timeout");

Ok(())
}
Expand Down