diff --git a/fc-agent/src/agent.rs b/fc-agent/src/agent.rs index 5bdae0a4..fa335ddf 100644 --- a/fc-agent/src/agent.rs +++ b/fc-agent/src/agent.rs @@ -64,7 +64,7 @@ pub async fn run() -> Result<()> { // Breaks the 30s poll loop when POLLHUP is not delivered after snapshot restore. let restore_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); - // Exec server rebind signal — shared by restore-epoch watcher and cache-ready handshake. + // Exec server rebind signal — shared by restore-epoch watcher. // After vsock transport reset, the listener's AsyncFd epoll becomes stale. // We use BOTH Notify (to wake the select loop) and AtomicBool (to persist the signal). // tokio::select! can lose Notify permits when accept() and notified() are both Ready @@ -72,11 +72,19 @@ pub async fn run() -> Result<()> { let exec_rebind = std::sync::Arc::new(tokio::sync::Notify::new()); let exec_rebind_needed = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + // Exec rebind confirmation — exec server signals after re_register() completes. + // handle_clone_restore waits on this before reconnecting output, ensuring exec is + // ready before the host starts health-checking via exec. + let exec_rebind_done = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let exec_rebind_done_notify = std::sync::Arc::new(tokio::sync::Notify::new()); + // Start restore-epoch watcher let watcher_output = output.clone(); let watcher_restore_flag = restore_flag.clone(); let watcher_exec_rebind = exec_rebind.clone(); let watcher_exec_rebind_needed = exec_rebind_needed.clone(); + let watcher_exec_rebind_done = exec_rebind_done.clone(); + let watcher_exec_rebind_done_notify = exec_rebind_done_notify.clone(); tokio::spawn(async move { eprintln!("[fc-agent] starting restore-epoch watcher"); mmds::watch_restore_epoch( @@ -84,6 +92,8 @@ pub async fn run() -> Result<()> { watcher_restore_flag, watcher_exec_rebind, watcher_exec_rebind_needed, + watcher_exec_rebind_done, + watcher_exec_rebind_done_notify, ) .await; }); @@ -92,8 +102,17 @@ pub async fn run() -> Result<()> { let (exec_ready_tx, exec_ready_rx) = tokio::sync::oneshot::channel(); let exec_rebind_clone = exec_rebind.clone(); let exec_rebind_needed_clone = exec_rebind_needed.clone(); + let exec_rebind_done_clone = exec_rebind_done.clone(); + let exec_rebind_done_notify_clone = exec_rebind_done_notify.clone(); tokio::spawn(async move { - exec::run_server(exec_ready_tx, exec_rebind_clone, exec_rebind_needed_clone).await; + exec::run_server( + exec_ready_tx, + exec_rebind_clone, + exec_rebind_needed_clone, + exec_rebind_done_clone, + exec_rebind_done_notify_clone, + ) + .await; }); match tokio::time::timeout(Duration::from_secs(5), exec_ready_rx).await { @@ -233,12 +252,11 @@ pub async fn run() -> Result<()> { if container::notify_cache_ready_and_wait(&digest, &restore_flag) { eprintln!("[fc-agent] cache ready notification acknowledged"); // Pre-start snapshot was taken and we've been restored into a new - // Firecracker instance. The vsock transport was reset, which - // invalidates the exec server's listener socket (stale AsyncFd epoll). - // Signal it to re-bind. Set flag BEFORE notify to prevent race where - // select! drops the Notified future (see exec.rs doc comment). - exec_rebind_needed.store(true, std::sync::atomic::Ordering::Release); - exec_rebind.notify_one(); + // Firecracker instance. Exec rebind + output reconnect are handled + // by handle_clone_restore() via the restore-epoch watcher. + // Do NOT signal exec rebind here — a duplicate re_register() corrupts + // the AsyncFd epoll registration, causing health checks to hang for ~60s + // (see plan trace evidence for the smoking gun vsock muxer logs). } else { eprintln!("[fc-agent] WARNING: cache-ready handshake failed, continuing"); } diff --git a/fc-agent/src/exec.rs b/fc-agent/src/exec.rs index c9fbf211..8cfb9925 100644 --- a/fc-agent/src/exec.rs +++ b/fc-agent/src/exec.rs @@ -31,6 +31,8 @@ pub async fn run_server( ready_tx: tokio::sync::oneshot::Sender<()>, rebind_signal: Arc, rebind_needed: Arc, + rebind_done: Arc, + rebind_done_notify: Arc, ) { eprintln!( "[fc-agent] starting exec server on vsock port {}", @@ -64,6 +66,8 @@ pub async fn run_server( "[fc-agent] exec server: vsock transport reset (flag), re-registering listener" ); listener = do_re_register(listener).await; + rebind_done.store(true, Ordering::Release); + rebind_done_notify.notify_one(); } tokio::select! { @@ -82,6 +86,8 @@ pub async fn run_server( rebind_needed.store(false, Ordering::Release); eprintln!("[fc-agent] exec server: vsock transport reset (notify), re-registering listener"); listener = do_re_register(listener).await; + rebind_done.store(true, Ordering::Release); + rebind_done_notify.notify_one(); } } } diff --git a/fc-agent/src/mmds.rs b/fc-agent/src/mmds.rs index e4a5ece4..82417b1d 100644 --- a/fc-agent/src/mmds.rs +++ b/fc-agent/src/mmds.rs @@ -139,6 +139,8 @@ pub async fn watch_restore_epoch( restore_flag: std::sync::Arc, exec_rebind: std::sync::Arc, exec_rebind_needed: std::sync::Arc, + exec_rebind_done: std::sync::Arc, + exec_rebind_done_notify: std::sync::Arc, ) { let mut last_epoch: Option = None; @@ -171,6 +173,8 @@ pub async fn watch_restore_epoch( &output, &exec_rebind, &exec_rebind_needed, + &exec_rebind_done, + &exec_rebind_done_notify, ) .await; last_epoch = metadata.restore_epoch; @@ -182,6 +186,8 @@ pub async fn watch_restore_epoch( &output, &exec_rebind, &exec_rebind_needed, + &exec_rebind_done, + &exec_rebind_done_notify, ) .await; last_epoch = metadata.restore_epoch; diff --git a/fc-agent/src/restore.rs b/fc-agent/src/restore.rs index 22a7e4d7..1670892b 100644 --- a/fc-agent/src/restore.rs +++ b/fc-agent/src/restore.rs @@ -6,7 +6,12 @@ use tokio::sync::Notify; use crate::network; use crate::output::OutputHandle; -/// Handle clone restore: kill stale sockets, flush ARP, reconnect output + exec. +/// Handle clone restore: kill stale sockets, flush ARP, re-register exec, reconnect output. +/// +/// CRITICAL ordering: exec re-register MUST complete before output reconnect. +/// The host uses the output connection as a readiness signal — once connected, +/// it starts the health monitor which calls `fcvm exec`. If exec's AsyncFd epoll +/// is still stale, health checks hang for ~60s (see trace evidence in plan). /// /// FUSE volumes are NOT remounted here. The reconnectable multiplexer /// detects the dead vsock and auto-reconnects to the clone's VolumeServer. @@ -15,20 +20,37 @@ pub async fn handle_clone_restore( output: &OutputHandle, exec_rebind: &Arc, exec_rebind_needed: &Arc, + exec_rebind_done: &Arc, + exec_rebind_done_notify: &Arc, ) { network::kill_stale_tcp_connections().await; network::flush_arp_cache().await; network::send_gratuitous_arp().await; - // Reconnect output vsock (broken by snapshot vsock reset). - // FUSE vsock reconnection is handled automatically by the reconnectable multiplexer. - output.reconnect(); - - // Re-bind exec server listener (AsyncFd epoll stale after transport reset). - // Set flag BEFORE notify to prevent race where select! drops the Notified future - // (see exec.rs doc comment for detailed explanation). + // FIRST: Re-register exec server listener (AsyncFd epoll stale after transport reset). + // Reset confirmation flag, then signal. Set flag BEFORE notify to prevent race + // where select! drops the Notified future (see exec.rs doc comment). + exec_rebind_done.store(false, Ordering::Release); exec_rebind_needed.store(true, Ordering::Release); exec_rebind.notify_one(); - eprintln!("[fc-agent] signaled output + exec vsock reconnect after restore"); + // SECOND: Wait for exec server to confirm re-register completed. + // This ensures accept() works before the host can reach the exec server. + match tokio::time::timeout( + std::time::Duration::from_secs(5), + exec_rebind_done_notify.notified(), + ) + .await + { + Ok(()) => { + eprintln!("[fc-agent] exec re-registered, proceeding to output reconnect") + } + Err(_) => eprintln!("[fc-agent] WARNING: exec re-register timed out (5s)"), + } + + // THIRD: Reconnect output vsock (tells host we're alive + exec is ready). + // FUSE vsock reconnection is handled automatically by the reconnectable multiplexer. + output.reconnect(); + + eprintln!("[fc-agent] signaled exec rebind + output reconnect after restore"); } diff --git a/src/commands/podman/listeners.rs b/src/commands/podman/listeners.rs index 1b2df8ee..61a170ae 100644 --- a/src/commands/podman/listeners.rs +++ b/src/commands/podman/listeners.rs @@ -175,6 +175,7 @@ pub(crate) async fn run_output_listener( log_tx: Option>, reconnect_notify: Arc, non_blocking_output: bool, + connected_tx: Option>, ) -> Result> { use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::net::UnixListener; @@ -225,6 +226,9 @@ pub(crate) async fn run_output_listener( }; connection_count += 1; debug!(vm_id = %vm_id, connection_count, "Output connection established"); + if let Some(tx) = connected_tx { + let _ = tx.send(()); + } let mut reader = BufReader::new(initial_stream); let mut line_buf = String::new(); @@ -403,7 +407,7 @@ mod tests { let socket_str = socket_path.to_string_lossy().to_string(); let reconnect = std::sync::Arc::new(tokio::sync::Notify::new()); let handle = tokio::spawn(async move { - run_output_listener(&socket_str, "test-vm", None, reconnect, lossy).await + run_output_listener(&socket_str, "test-vm", None, reconnect, lossy, None).await }); for _ in 0..50 { if socket_path.exists() { diff --git a/src/commands/podman/mod.rs b/src/commands/podman/mod.rs index defb758d..2d431d10 100644 --- a/src/commands/podman/mod.rs +++ b/src/commands/podman/mod.rs @@ -784,6 +784,7 @@ pub async fn prepare_vm(mut args: RunArgs) -> Result> { log_tx_clone, reconnect, non_blocking_output, + None, ) .await { diff --git a/src/commands/snapshot.rs b/src/commands/snapshot.rs index 4afabd51..8f9fe2dd 100644 --- a/src/commands/snapshot.rs +++ b/src/commands/snapshot.rs @@ -601,6 +601,8 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> { // listener to drop its dead vsock stream and re-accept. Without this, the listener // stays stuck reading from the old (dead) connection after VM resume resets vsock. let output_reconnect = Arc::new(tokio::sync::Notify::new()); + // Channel to know when fc-agent's output connection arrives (gates health monitor) + let (output_connected_tx, output_connected_rx) = tokio::sync::oneshot::channel(); let output_handle = if !tty_mode { let socket_path = output_socket_path.clone(); let vm_id_clone = vm_id.clone(); @@ -612,6 +614,7 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> { None, reconnect, non_blocking_output, + Some(output_connected_tx), ) .await { @@ -857,14 +860,9 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> { let (mut vm_manager, mut holder_child) = setup_result.unwrap(); - // For startup snapshots (container already running), the output listener has an - // active connection from fc-agent that's now dead after VM resume. Signal it to - // drop the dead stream and re-accept. For pre-start snapshots (container not yet - // started), the listener is fresh with no connection — DON'T notify, or the - // stored permit will poison the first real connection by dropping it immediately. - if args.startup_snapshot_base_key.is_some() { - output_reconnect.notify_one(); - } + // fc-agent's handle_clone_restore() now drives the output reconnect sequence: + // exec rebind → wait for confirmation → output.reconnect(). No host-side + // notify needed — the listener will accept fc-agent's new connection naturally. let is_uffd = use_uffd || std::env::var("FCVM_FORCE_UFFD").is_ok() || hugepages; if is_uffd { @@ -972,6 +970,21 @@ pub async fn cmd_snapshot_run(args: SnapshotRunArgs) -> Result<()> { (None, None) }; + // Wait for fc-agent output connection before starting health monitor. + // This ensures the deterministic handshake chain is complete: + // exec_rebind → exec_re_register → rebind_done → output.reconnect() → HERE + // Without this gate, the health monitor could start exec calls before + // the exec server has re-registered its AsyncFd after restore. + if !tty_mode { + match tokio::time::timeout(std::time::Duration::from_secs(30), output_connected_rx).await { + Ok(Ok(())) => info!(vm_id = %vm_id, "fc-agent output connected, exec server ready"), + Ok(Err(_)) => warn!(vm_id = %vm_id, "output connected_tx dropped"), + Err(_) => { + warn!(vm_id = %vm_id, "fc-agent did not connect within 30s, proceeding anyway") + } + } + } + // Spawn health monitor task with startup snapshot trigger support let health_monitor_handle = crate::health::spawn_health_monitor_full( vm_id.clone(), diff --git a/tests/test_localhost_rootless.rs b/tests/test_localhost_rootless.rs index 608cee8a..5d8c9b4b 100644 --- a/tests/test_localhost_rootless.rs +++ b/tests/test_localhost_rootless.rs @@ -290,6 +290,8 @@ async fn test_localhost_rootless_btrfs_snapshot_restore() -> Result<()> { .context("spawning fcvm (second run)")?; println!(" fcvm PID: {}", fcvm_pid); + // Pre-start snapshot restore still needs to run `podman run` from scratch + // (container doesn't exist yet), so it needs the same timeout as a fresh boot. common::poll_health_by_pid(fcvm_pid, 120) .await .context("second run (from snapshot): VM failed to become healthy")?; @@ -360,6 +362,74 @@ async fn test_localhost_rootless_btrfs_snapshot_restore() -> Result<()> { driver ); + // Step 4: Exec stress test — fire 10 parallel exec calls with tight timeouts. + // Catches the double-rebind regression where exec hangs for ~150s after restore. + // Each call must complete within 10s (the bug caused 150s hangs). + println!("\n4. Post-restore exec stress test (10 parallel calls, 10s timeout each)..."); + let stress_start = std::time::Instant::now(); + let mut handles: Vec>> = + Vec::new(); + for i in 0..10 { + let handle = tokio::spawn(async move { + let call_start = std::time::Instant::now(); + let result = tokio::time::timeout( + tokio::time::Duration::from_secs(10), + common::exec_in_vm(fcvm_pid, &["echo", &format!("stress-{}", i)]), + ) + .await; + let elapsed = call_start.elapsed(); + match result { + Ok(Ok(output)) => { + let output = output.trim().to_string(); + assert_eq!( + output, + format!("stress-{}", i), + "exec call {} returned wrong output", + i + ); + println!( + " exec call {} completed in {:.1}ms", + i, + elapsed.as_secs_f64() * 1000.0 + ); + Ok(elapsed) + } + Ok(Err(e)) => { + panic!("exec call {} failed: {}", i, e); + } + Err(_) => { + panic!( + "exec call {} timed out after 10s — exec server likely broken by double rebind", + i + ); + } + } + }); + handles.push(handle); + } + + let mut max_latency = std::time::Duration::ZERO; + for handle in handles { + let elapsed = handle.await.expect("exec stress task panicked")?; + if elapsed > max_latency { + max_latency = elapsed; + } + } + let total_elapsed = stress_start.elapsed(); + println!( + " All 10 exec calls completed in {:.1}ms (max single: {:.1}ms)", + total_elapsed.as_secs_f64() * 1000.0, + max_latency.as_secs_f64() * 1000.0 + ); + // Sanity check: all 10 parallel calls should finish in under 30s total. + // With the bug, they'd each timeout at 10s = 10s total (parallel), or + // if sequential would be 150s+ total. + assert!( + total_elapsed.as_secs() < 30, + "exec stress test took {}s — expected <30s for 10 parallel calls", + total_elapsed.as_secs() + ); + common::kill_process(fcvm_pid).await; } @@ -367,6 +437,7 @@ async fn test_localhost_rootless_btrfs_snapshot_restore() -> Result<()> { println!(" - First run: fresh boot, btrfs image, snapshot created"); println!(" - Second run: restored from snapshot, btrfs image isolated per-clone"); println!(" - Username preserved across snapshot for health checks"); + println!(" - Post-restore exec stress: 10 parallel calls completed within timeout"); Ok(()) }