diff --git a/iroh/examples/custom-transport.rs b/iroh/examples/custom-transport.rs index 945a13f1b9d..4766755aa93 100644 --- a/iroh/examples/custom-transport.rs +++ b/iroh/examples/custom-transport.rs @@ -11,7 +11,6 @@ use iroh::{ test_utils::test_transport::{TEST_TRANSPORT_ID, TestNetwork, TestTransport}, }; use n0_error::{Result, StdResultExt}; -use n0_watcher::Watcher; /// Each protocol is identified by its ALPN string. /// @@ -124,7 +123,7 @@ async fn main() -> Result<()> { // Helper to print paths and verify test transport is selected let verify_test_transport = |label: &str| { - let paths = conn.paths().get(); + let paths = conn.paths(); println!("Paths {}:", label); for path in paths.iter() { println!( @@ -134,8 +133,8 @@ async fn main() -> Result<()> { path.rtt() ); } - let selected_path = paths.iter().find(|p| p.is_selected()); - let is_test_transport = selected_path.is_some_and(|p| { + let selected_path = paths.selected(); + let is_test_transport = selected_path.as_ref().is_some_and(|p| { matches!(p.remote_addr(), TransportAddr::Custom(addr) if addr.id() == TEST_TRANSPORT_ID) }); assert!( @@ -143,7 +142,7 @@ async fn main() -> Result<()> { "Expected test transport (id={}) to be selected {}, got: {:?}", TEST_TRANSPORT_ID, label, - selected_path.map(|p| p.remote_addr()) + selected_path.as_ref().map(|p| p.remote_addr()) ); println!( "Verified: test transport (id={}) is selected {}", diff --git a/iroh/examples/monitor-connections.rs b/iroh/examples/monitor-connections.rs index 6a1b9cc5aee..6ece2ded714 100644 --- a/iroh/examples/monitor-connections.rs +++ b/iroh/examples/monitor-connections.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, time::Duration}; use iroh::{ - Endpoint, Watcher, + Endpoint, endpoint::{ AfterHandshakeOutcome, Closed, Connection, EndpointHooks, WeakConnectionHandle, presets, }, @@ -133,7 +133,7 @@ impl Monitor { Some(MonitoredConnection { alpn, remote_id, handle }) = rx.recv() => { let alpn = String::from_utf8_lossy(&alpn).to_string(); let remote = remote_id.fmt_short(); - let rtt = handle.upgrade().and_then(|c| c.paths().peek().iter().map(|p| p.stats().expect("conn is not dropped").rtt).min()); + let rtt = handle.upgrade().and_then(|c| c.paths().iter().map(|p| p.rtt()).min()); info!(%remote, %alpn, ?rtt, "new connection"); tasks.spawn(async move { match handle.closed().await { diff --git a/iroh/examples/remote-info.rs b/iroh/examples/remote-info.rs index deff86efafa..ffc36a2106b 100644 --- a/iroh/examples/remote-info.rs +++ b/iroh/examples/remote-info.rs @@ -101,10 +101,10 @@ fn log_active(remote_map: &RemoteMap) { "[{}] is_active {}, connections {}, ip_path {:?}, relay_path {:?}, current_min_rtt {:?}", id.fmt_short(), info.is_active(), - info.connections().count(), + info.active_connections(), info.has_ip_path(), info.has_relay_path(), - info.current_min_rtt() + info.current_min_rtt(), ); } } @@ -126,7 +126,7 @@ fn log_aggregate(remote_map: &RemoteMap) { aggregate.relay_path, SystemTime::now() .duration_since(aggregate.last_update) - .unwrap() + .unwrap_or_default() ); } } @@ -166,14 +166,13 @@ mod remote_map { }; use iroh::{ - EndpointId, Watcher, + EndpointId, TransportAddr, endpoint::{ - AfterHandshakeOutcome, Connection, EndpointHooks, PathInfo, WeakConnectionHandle, + AfterHandshakeOutcome, Connection, EndpointHooks, PathEvent, WeakConnectionHandle, }, }; - use n0_future::task::AbortOnDropHandle; + use n0_future::{StreamExt, task::AbortOnDropHandle}; use tokio::{sync::mpsc, task::JoinSet}; - use tokio_stream::StreamExt; use tracing::{Instrument, debug, info, info_span}; /// Information about a remote info. @@ -211,19 +210,13 @@ mod remote_map { } impl Aggregate { - fn update(&mut self, path: &PathInfo) { + fn update(&mut self, addr: TransportAddr, stats: iroh::endpoint::PathStats) { self.last_update = SystemTime::now(); - if path.is_ip() { - self.ip_path = true; - } - if path.is_relay() { - self.relay_path = true; - } - if let Some(stats) = path.stats() { - debug!("path update addr {:?} {stats:?}", path.remote_addr()); - self.rtt_min = self.rtt_min.min(stats.rtt); - self.rtt_max = self.rtt_max.max(stats.rtt); - } + self.ip_path |= addr.is_ip(); + self.relay_path |= addr.is_relay(); + debug!("path update {addr} {stats:?}"); + self.rtt_min = self.rtt_min.min(stats.rtt); + self.rtt_max = self.rtt_max.max(stats.rtt); } } @@ -239,46 +232,37 @@ mod remote_map { /// /// Returns `None` if there are no active connections. pub fn current_min_rtt(&self) -> Option { - self.connections() - .flat_map(|c| c.upgrade()) - .flat_map(|c| c.paths().get().into_iter()) - .flat_map(|p| p.stats()) - .map(|s| s.rtt) + self.upgraded() + .filter_map(|c| c.paths().iter().map(|p| p.rtt()).min()) .min() } /// Returns whether any active connection to the remote has an active IP path. - /// - /// Returns `None` if there are no active connections. - pub fn has_ip_path(&self) -> Option { - self.connections() - .flat_map(|c| c.upgrade()) - .flat_map(|c| c.paths().get()) - .filter(|path| path.is_ip()) - .map(|_| true) - .next() + pub fn has_ip_path(&self) -> bool { + self.upgraded().any(|c| c.paths().iter().any(|p| p.is_ip())) } /// Returns whether any active connection to the remote has an active relay path. - /// - /// Returns `None` if there are no active connections. - pub fn has_relay_path(&self) -> Option { - self.connections() - .flat_map(|c| c.upgrade()) - .flat_map(|c| c.paths().get()) - .filter(|path| path.is_relay()) - .map(|_| true) - .next() + pub fn has_relay_path(&self) -> bool { + self.upgraded() + .any(|c| c.paths().iter().any(|p| p.is_relay())) } - /// Returns `true` if there are active connections to this node. + /// Returns `true` if there are active connections to this remote. pub fn is_active(&self) -> bool { - !self.connections.is_empty() + self.active_connections() > 0 + } + + /// Returns the number of active connections to this remote. + pub fn active_connections(&self) -> usize { + self.upgraded().count() } - /// Returns an iterator over [`WeakConnectionHandle`] for currently active connections to this remote. - pub fn connections(&self) -> impl Iterator { - self.connections.values() + /// Returns an iterator over all active handles upgraded to a [`Connection`]. + fn upgraded(&self) -> impl Iterator { + self.connections + .values() + .filter_map(WeakConnectionHandle::upgrade) } } @@ -368,18 +352,14 @@ mod remote_map { // Main loop loop { tokio::select! { - conn = rx.recv() => { - match conn { - Some(conn) => { - conn_id += 1; - Self::on_connection(&mut tasks, map.clone(), conn_id, conn); - }, - None => break, - } + Some(conn) = rx.recv() => { + conn_id += 1; + Self::on_connection(&mut tasks, map.clone(), conn_id, conn); } Some(res) = tasks.join_next(), if !tasks.is_empty() => { res.expect("conn close task panicked"); } + else => break, } } @@ -410,41 +390,38 @@ mod remote_map { .insert(conn_id, conn.handle.clone()); } - // Track connection closing to clear up the map. - tasks.spawn({ - let remote_id = conn.remote_id; - let handle = conn.handle.clone(); - let map = map.clone(); - async move { - handle.closed().await; - { - let mut inner = map.write().expect("poisoned"); - let info = inner.entry(remote_id).or_default(); - info.connections.remove(&conn_id); - info.aggregate.last_update = SystemTime::now(); - } - } - .instrument(tracing::Span::current()) - }); - // Track path changes to update stats aggregate. - if let Some(watcher) = conn.handle.upgrade().map(|c| c.paths()) { - let remote_id = conn.remote_id; - tasks.spawn({ + if let Some(mut path_events) = conn.handle.upgrade().map(|conn| conn.path_events()) { + tasks.spawn( async move { - let mut path_updates = watcher.stream(); - while let Some(paths) = path_updates.next().await { - { - let mut inner = map.write().expect("poisoned"); - let info = inner.entry(remote_id).or_default(); - for path in paths { - info.aggregate.update(&path); + while let Some(event) = path_events.next().await { + let mut inner = map.write().expect("poisoned"); + let info = inner.entry(conn.remote_id).or_default(); + match event { + PathEvent::Closed { + remote_addr, + last_stats, + .. + } => { + info.aggregate.update(remote_addr, *last_stats); + } + _ => { + if let Some(conn) = conn.handle.upgrade() { + for path in conn.paths().iter() { + info.aggregate + .update(path.remote_addr().clone(), path.stats()); + } + } } } } + let mut inner = map.write().expect("poisoned"); + let info = inner.entry(conn.remote_id).or_default(); + info.connections.remove(&conn_id); + info.aggregate.last_update = SystemTime::now(); } - .instrument(tracing::Span::current()) - }); + .instrument(tracing::Span::current()), + ); } } @@ -459,8 +436,10 @@ mod remote_map { let mut inner = map.write().expect("poisoned"); inner.retain(|_remote, info| { info.is_active() - || now.duration_since(info.aggregate().last_update).unwrap() - < retention_time + || now + .duration_since(info.aggregate().last_update) + .map(|age| age < retention_time) + .unwrap_or(true) }); } } diff --git a/iroh/examples/transfer.rs b/iroh/examples/transfer.rs index 01cfbcd3a55..a5d56f70ee6 100644 --- a/iroh/examples/transfer.rs +++ b/iroh/examples/transfer.rs @@ -40,7 +40,6 @@ use indicatif::HumanBytes; use ipnet::{Ipv4Net, Ipv6Net}; use iroh::{ Endpoint, EndpointAddr, EndpointId, RelayMap, RelayMode, RelayUrl, SecretKey, TransportAddr, - Watcher, address_lookup::{ AddrFilter, dns::DnsAddressLookup, @@ -48,16 +47,17 @@ use iroh::{ }, dns::{DnsResolver, N0_DNS_ENDPOINT_ORIGIN_PROD, N0_DNS_ENDPOINT_ORIGIN_STAGING}, endpoint::{ - BindOpts, Connection, ConnectionError, PathId, PathWatcher, QuicTransportConfig, - RecvStream, SendStream, VarInt, WriteError, presets, + BindOpts, Connection, ConnectionError, PathEvent, PathId, QuicTransportConfig, RecvStream, + SendStream, VarInt, WriteError, presets, }, }; use n0_error::{Result, StackResultExt, StdResultExt, anyerr, ensure_any}; -use n0_future::{stream::StreamExt, task::AbortOnDropHandle}; +use n0_future::StreamExt; use postcard::experimental::max_size::MaxSize; use serde::{Deserialize, Serialize, Serializer}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, + task::JoinHandle, time::timeout, }; use tracing::{Instrument, Span, debug, error, info, info_span, instrument, warn}; @@ -644,8 +644,8 @@ async fn provide(endpoint: &Endpoint, output: Output) -> Result<()> { async fn handle_connection(conn: Connection, output: Output) { let start = Instant::now(); let remote_id = conn.remote_id(); - let watcher = conn.paths(); - let _guard = watch_conn_type(&conn, Some(remote_id), output); + // Spawn a background task that prints connection type changes and collects stats of all paths. + let stats_task = spawn_path_watcher(conn.clone(), Some(remote_id), output); // Accept incoming streams in a loop until the connection is closed by the remote. let close_reason = loop { @@ -653,10 +653,9 @@ async fn handle_connection(conn: Connection, output: Output) { Ok(streams) => streams, Err(err) => break err, }; - let conn = conn.clone(); tokio::spawn( async move { - if let Err(err) = handle_request(&conn, send, recv, output).await { + if let Err(err) = handle_request(remote_id, send, recv, output).await { warn!("[{}] Request failed: {err:#}", remote_id.fmt_short()); } } @@ -677,12 +676,12 @@ async fn handle_connection(conn: Connection, output: Output) { duration: start.elapsed(), }, ); - output.emit_with_remote(remote_id, PathStats::from_watcher(watcher)); + stats_task.await.expect("stats task panicked"); } #[instrument("handle", skip_all, fields(id=send.id().index()))] async fn handle_request( - conn: &Connection, + remote_id: EndpointId, send: SendStream, mut recv: RecvStream, output: Output, @@ -690,15 +689,15 @@ async fn handle_request( let request = Request::read(&mut recv) .await .context("failed to read request")?; - output.emit_with_remote(conn.remote_id(), HandleRequest { request: &request }); + output.emit_with_remote(remote_id, HandleRequest { request: &request }); match request { Request::Download(length) => { let stats = send_data(send, recv, length).await?; - output.emit_with_remote(conn.remote_id(), UploadComplete { stats }); + output.emit_with_remote(remote_id, UploadComplete { stats }); } Request::Upload => { let stats = drain_stream(recv, send, None).await?; - output.emit_with_remote(conn.remote_id(), DownloadComplete { stats }); + output.emit_with_remote(remote_id, DownloadComplete { stats }); } } Ok(()) @@ -714,15 +713,13 @@ async fn fetch( // Attempt to connect, over the given ALPN. Returns a connection. let start = Instant::now(); let conn = endpoint.connect(remote_addr, TRANSFER_ALPN).await?; - let conn_info = conn.weak_handle(); let remote_id = conn.remote_id(); output.emit(Connected { remote_id, duration: start.elapsed(), }); - let watcher = conn.paths(); - // Spawn a background task that prints connection type changes. Will be aborted on drop. - let _guard = watch_conn_type(&conn, None, output); + // Spawn a background task that prints connection type changes and collects stats of all paths. + let stats_task = spawn_path_watcher(conn.clone(), None, output); output.emit(StartRequest { mode, length }); // Perform requests depending on the request mode. @@ -758,6 +755,11 @@ async fn fetch( _ = tokio::signal::ctrl_c() => Err(anyerr!("Cancelled")) }; + // If the connection hasn't been closed above, close it now. + if conn.close_reason().is_none() { + conn.close(0u32.into(), b"shutdown"); + } + let error = conn .close_reason() .filter(|reason| !matches!(reason, ConnectionError::LocallyClosed)) @@ -767,12 +769,7 @@ async fn fetch( duration: start.elapsed(), }); - // Stats are collected by the paths watcher, so we do not look at the stats returned by - // this call. It is however the only API we currently have to tell us when the - // connection is drained and the stats will no longer change. - conn_info.closed().await; - output.emit(PathStats::from_watcher(watcher)); - + stats_task.await.expect("stats task panicked"); res } @@ -929,41 +926,47 @@ fn parse_byte_size(s: &str) -> std::result::Result { cfg.parse_size(s) } -fn watch_conn_type( - conn: &Connection, +fn spawn_path_watcher( + conn: Connection, remote_id: Option, output: Output, -) -> AbortOnDropHandle<()> { - let print = move |path: SelectedPath| { - let event = ConnectionTypeChanged { path }; - if let Some(remote_id) = remote_id { - output.emit_with_remote(remote_id, event) - } else { - output.emit(event) - } +) -> JoinHandle<()> { + let print = move |conn: &Connection| { + let path = match conn.paths().selected() { + Some(p) => SelectedPath::Selected { + id: p.id(), + addr: p.remote_addr().clone(), + rtt: p.rtt(), + }, + None => SelectedPath::None, + }; + output.emit_maybe_remote(remote_id, ConnectionTypeChanged { path }); }; - let mut stream = conn.paths().stream(); - let task = tokio::task::spawn(async move { - let mut previous = None; - while let Some(paths) = stream.next().await { - if let Some(path) = paths.iter().find(|p| p.is_selected()) { - // We can get path updates without the selected path changing. We don't want to log again in that case. - if Some(path) == previous.as_ref() { - continue; + tokio::spawn(async move { + let mut events = conn.path_events(); + print(&conn); + let mut paths = Vec::new(); + while let Some(event) = events.next().await { + match event { + PathEvent::Selected { .. } => print(&conn), + PathEvent::Closed { + id, + remote_addr, + last_stats, + } => { + paths.push(PathData { + id, + remote_addr, + rtt: last_stats.rtt, + bytes_sent: last_stats.udp_tx.bytes, + bytes_recv: last_stats.udp_rx.bytes, + }); } - print(SelectedPath::Selected { - id: path.id(), - addr: path.remote_addr().clone(), - rtt: path.rtt().expect("conn is not dropped"), - }); - previous = Some(path.clone()); - } else { - output.emit(SelectedPath::None); - previous = None; + _ => {} } } - }); - AbortOnDropHandle::new(task) + output.emit_maybe_remote(remote_id, PathStats { paths }); + }) } fn parse_ipv4_net(s: &str) -> Result<(SocketAddrV4, u8)> { @@ -1039,6 +1042,13 @@ impl Output { } } + fn emit_maybe_remote(&self, remote: Option, event: impl Serialize + fmt::Display) { + match remote { + None => self.emit(event), + Some(remote) => self.emit_with_remote(remote, event), + } + } + fn emit_if_json(&self, event: &impl Serialize) { if matches!(self.mode, OutputMode::Json) { println!( @@ -1102,13 +1112,11 @@ enum SelectedPath { impl fmt::Display for SelectedPath { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match &self { + match self { Self::Selected { addr, rtt, id } => { write!(f, "{addr:?} [id:{id}] (RTT: {})", fmt_duration(*rtt)) } - Self::None => { - write!(f, "none") - } + Self::None => write!(f, "none"), } } } @@ -1165,26 +1173,6 @@ struct PathStats { paths: Vec, } -impl PathStats { - fn from_watcher(mut watcher: PathWatcher) -> Self { - let list = watcher - .get() - .iter() - .filter_map(|info| { - let stats = info.stats()?; - Some(PathData { - id: info.id(), - remote_addr: info.remote_addr().clone(), - rtt: stats.rtt, - bytes_sent: stats.udp_tx.bytes, - bytes_recv: stats.udp_rx.bytes, - }) - }) - .collect(); - PathStats { paths: list } - } -} - impl fmt::Display for PathStats { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Path stats:")?; diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index fb0a4953876..47b05020d95 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -39,8 +39,8 @@ use self::hooks::EndpointHooksList; pub use super::socket::{ BindError, DirectAddr, DirectAddrType, remote_map::{ - PathInfo, PathInfoList, PathInfoListIter, PathWatcher, RemoteInfo, TransportAddrInfo, - TransportAddrUsage, + Path, PathEvent, PathEventStream, PathList, PathListIter, PathListStream, RemoteInfo, + TransportAddrInfo, TransportAddrUsage, }, }; #[cfg(wasm_browser)] @@ -1966,7 +1966,7 @@ mod tests { address_lookup::memory::MemoryLookup, endpoint::{ ApplicationClose, BindError, BindOpts, ConnectError, ConnectOptions, - ConnectWithOptsError, Connection, ConnectionError, PathWatcher, presets, + ConnectWithOptsError, Connection, ConnectionError, PathEvent, PathEventStream, presets, }, protocol::{AcceptError, ProtocolHandler, Router}, test_utils::{QlogFileGroup, run_relay_server, run_relay_server_with}, @@ -2366,7 +2366,7 @@ mod tests { let conn = ep.connect(dst, TEST_ALPN).await?; let mut send = conn.open_uni().await.anyerr()?; send.write_all(b"hello").await.anyerr()?; - let mut paths = conn.paths().stream(); + let mut paths = conn.path_updates(); info!("Waiting for direct connection"); while let Some(infos) = paths.next().await { info!(?infos, "new PathInfos"); @@ -2471,7 +2471,7 @@ mod tests { let conn = ep.connect(dst, TEST_ALPN).await?; let mut send = conn.open_uni().await.anyerr()?; send.write_all(b"hello").await.anyerr()?; - let mut paths = conn.paths().stream(); + let mut paths = conn.path_updates(); info!("Waiting for connection"); 'outer: while let Some(infos) = paths.next().await { info!(?infos, "new PathInfos"); @@ -2485,6 +2485,7 @@ mod tests { } } info!("Have relay connection"); + send.write_all(b"close please").await.anyerr()?; send.finish().anyerr()?; let res = conn.closed().await; @@ -2571,11 +2572,12 @@ mod tests { // We should be connected via IP, because it is faster than the relay server. // TODO: Maybe not panic if this is not true? - let path_info = conn.paths().get(); + + let path_info = conn.paths(); assert_eq!(path_info.len(), 1); assert!(path_info.iter().next().unwrap().is_ip()); - let mut paths = conn.paths().stream(); + let mut paths = conn.path_updates(); time::timeout(Duration::from_secs(5), async move { while let Some(infos) = paths.next().await { info!(?infos, "new PathInfos"); @@ -2622,7 +2624,7 @@ mod tests { // Wait for a relay connection to be added. Client does all the asserting here, // we just want to wait so we get to see all the mechanics of the connection // being added on this side too. - let mut paths = conn.paths().stream(); + let mut paths = conn.path_updates(); time::timeout(Duration::from_secs(5), async move { while let Some(infos) = paths.next().await { info!(?infos, "new PathInfos"); @@ -3488,17 +3490,19 @@ mod tests { )); let transfer_size = 1_000_000; - fn collect_stats(mut watcher: PathWatcher) -> BTreeMap { - watcher - .get() - .iter() - .map(|info| { - ( - info.remote_addr().clone(), - info.stats().expect("conn is not yet dropped"), - ) - }) - .collect() + async fn collect_stats(mut events: PathEventStream) -> BTreeMap { + let mut stats = BTreeMap::new(); + while let Some(event) = events.next().await { + if let PathEvent::Closed { + remote_addr, + last_stats, + id: _, + } = event + { + stats.insert(remote_addr, *last_stats); + } + } + stats } let client = Endpoint::builder(presets::Minimal) @@ -3518,25 +3522,25 @@ mod tests { let server_task = tokio::spawn(async move { let incoming = server.accept().await.anyerr()?; let conn = incoming.await.anyerr()?; - let watcher = conn.paths(); + let stats_task = tokio::spawn(collect_stats(conn.path_events())); let (mut send, mut recv) = conn.accept_bi().await.anyerr()?; let msg = recv.read_to_end(transfer_size).await.anyerr()?; send.write_all(&msg).await.anyerr()?; send.finish().anyerr()?; conn.closed().await; - let stats = collect_stats(watcher); + let stats = stats_task.await.expect("stats task panicked"); Ok::<_, Error>(stats) }); let conn = client.connect(server_addr, TEST_ALPN).await?; - let watcher = conn.paths(); + let client_stats_task = tokio::spawn(collect_stats(conn.path_events())); let (mut send, mut recv) = conn.open_bi().await.anyerr()?; send.write_all(&vec![42u8; transfer_size]).await.anyerr()?; send.finish().anyerr()?; recv.read_to_end(transfer_size).await.anyerr()?; conn.close(0u32.into(), b"thanks, bye!"); client.close().await; - let client_stats = collect_stats(watcher); + let client_stats = client_stats_task.await.expect("stats task panicked"); let server_stats = server_task.await.anyerr()??; info!("client stats: {client_stats:#?}"); diff --git a/iroh/src/endpoint/connection.rs b/iroh/src/endpoint/connection.rs index 161e81191b5..95939043b8f 100644 --- a/iroh/src/endpoint/connection.rs +++ b/iroh/src/endpoint/connection.rs @@ -47,7 +47,7 @@ use crate::{ }, socket::{ RemoteStateActorStoppedError, - remote_map::{PathWatchable, PathWatcher}, + remote_map::{PathEventStream, PathList, PathListStream, PathStateReceiver}, transports, }, }; @@ -769,7 +769,7 @@ pub struct Connection { #[derive(Debug, Clone)] pub struct HandshakeCompletedData { info: StaticInfo, - paths: PathWatchable, + paths: PathStateReceiver, } /// Static info from a completed TLS handshake. @@ -1102,39 +1102,53 @@ impl Connection { self.data.info.endpoint_id } - /// Returns a [`Watcher`] for the network paths of this connection. + /// Returns the currently open network paths for this connection. /// - /// A connection can have several network paths to the remote endpoint, commonly there - /// will be a path via the relay server and a holepunched path. + /// A connection typically has one path via the relay server and, + /// once holepunching succeeds, a direct path. The returned + /// [`PathList`] is a snapshot taken at call time: it does not + /// reflect later changes, and it does not include paths that have + /// already closed. /// - /// Returns a [`PathWatcher`], which implements the [`Watcher`] trait. The watcher is updated - /// whenever a path is opened or closed, or when the path selected for transmission changes - /// (see [`PathInfo::is_selected`]). + /// To observe changes over time, see [`Connection::path_updates`] + /// for a stream of [`PathList`] snapshots and + /// [`Connection::path_events`] for individual [`PathEvent`]s. /// - /// The [`PathInfoList`] returned from the watcher contains a [`PathInfo`] for each - /// network path. + /// [`PathEvent`]: crate::endpoint::PathEvent + pub fn paths(&self) -> PathList<'_> { + self.data.paths.get(&self.inner) + } + + /// Returns a stream of [`PathList`] snapshots for this connection. + /// + /// Yields the current snapshot on the first poll, and a fresh + /// snapshot whenever the open paths or the selected path change. + /// Ends when the connection closes. + /// + /// The stream borrows this [`Connection`]. To consume it from a + /// spawned task, move a [`Connection`] clone into the task and + /// call this method inside. + pub fn path_updates(&self) -> PathListStream<'_> { + self.data.paths.updates(&self.inner) + } + + /// Returns a stream of [`PathEvent`]s for this connection. /// - /// As long as a [`PathWatcher`] is alive, the list of paths will only grow. If paths - /// are closed, they will be marked as closed (see [`PathInfo::is_closed`]) but will - /// not be removed from the list of paths. This allows to reliably retrieve stats for - /// closed paths. + /// Each event reports one of: a path opened, a path closed (with + /// final per-path statistics), the selected transmission path + /// changed, or the consumer fell behind. The stream ends when the + /// connection closes. It does not borrow this [`Connection`] and + /// may be moved into a spawned task. /// - /// A [`PathWatcher`] does not keep the [`Connection`] itself alive. If all references to - /// a connection are dropped, the [`PathWatcher`] will start to return an error when - /// updating. Its last value may still be used - note however that accessing - /// stats for a path via [`PathInfo::stats`] returns `None` if all references to a - /// [`Connection`] have been dropped. To reliably access path stats when a connection closes, - /// wait for [`Connection::closed`] and then call [`Connection::paths`] and directly - /// iterate over the path stats while the [`Connection`] struct is still in scope. + /// If the consumer does not poll fast enough, the stream yields a + /// single [`PathEvent::Lagged`]; the current state of the open + /// paths and the selected path remains recoverable via + /// [`Connection::paths`]. /// - /// [`PathInfoList`]: crate::endpoint::PathInfoList - /// [`PathInfo`]: crate::endpoint::PathInfo - /// [`PathInfo::is_selected`]: crate::endpoint::PathInfo::is_selected - /// [`PathInfo::is_closed`]: crate::endpoint::PathInfo::is_closed - /// [`PathInfo::stats`]: crate::endpoint::PathInfo::stats - /// [`Watcher`]: crate::Watcher - pub fn paths(&self) -> PathWatcher { - self.data.paths.watch() + /// [`PathEvent`]: crate::endpoint::PathEvent + /// [`PathEvent::Lagged`]: crate::endpoint::PathEvent::Lagged + pub fn path_events(&self) -> PathEventStream { + self.data.paths.events() } /// Returns the side of the connection (client or server). @@ -1287,16 +1301,15 @@ mod tests { use iroh_base::{EndpointAddr, SecretKey}; use iroh_relay::tls::CaRootsConfig; use n0_error::{Result, StackResultExt, StdResultExt}; - use n0_future::StreamExt; + use n0_future::{Stream, StreamExt}; use n0_tracing_test::traced_test; - use n0_watcher::Watcher; use rand::{RngExt, SeedableRng}; use tracing::{Instrument, error_span, info, info_span, trace_span}; use super::Endpoint; use crate::{ RelayMode, - endpoint::{ConnectOptions, Incoming, PathInfo, PathInfoList, ZeroRttStatus, presets}, + endpoint::{ConnectOptions, Incoming, PathList, ZeroRttStatus, presets}, test_utils::run_relay_server, }; @@ -1540,21 +1553,20 @@ mod tests { async { server.accept().await.unwrap().await.unwrap() } ); info!("connected"); - let mut paths_client = conn_client.paths().stream(); - let mut paths_server = conn_server.paths().stream(); + + let mut paths_client = conn_client.path_updates(); + let mut paths_server = conn_server.path_updates(); /// Advances the path stream until at least one IP and one relay paths are available. /// /// Panics if the path stream finishes before that happens. - async fn wait_for_paths( - stream: &mut n0_watcher::Stream + Unpin>, - ) { + async fn wait_for_paths(mut stream: impl Send + Unpin + Stream>) { loop { let paths = stream.next().await.expect("paths stream ended"); info!(?paths, "paths"); if paths.len() >= 2 - && paths.iter().any(PathInfo::is_relay) - && paths.iter().any(PathInfo::is_ip) + && paths.iter().any(|p| p.is_relay()) + && paths.iter().any(|p| p.is_ip()) { info!("break"); return; @@ -1591,7 +1603,7 @@ mod tests { .await .expect("client paths watcher did not close within 1s of connection close"); tokio::time::timeout(Duration::from_nanos(1), async { - while paths_client.next().await.is_some() {} + while paths_server.next().await.is_some() {} }) .await .expect("server paths watcher did not close within 1s of connection close"); diff --git a/iroh/src/socket.rs b/iroh/src/socket.rs index 592dec85130..52b73e9572a 100644 --- a/iroh/src/socket.rs +++ b/iroh/src/socket.rs @@ -76,7 +76,7 @@ use crate::{ runtime::Runtime, socket::{ concurrent_read_map::ReadOnlyMap, - remote_map::{MappedAddrs, PathWatchable, RemoteInfo}, + remote_map::{MappedAddrs, PathStateReceiver, RemoteInfo}, transports::{HomeRelayWatch, HomeRelayWatcher, TransportBiasMap}, }, tls::{ @@ -1342,14 +1342,14 @@ impl EndpointInner { /// The actor is responsible for holepunching and opening additional paths to this /// connection. /// - /// Returns a future that resolves to [`PathWatchable`]. + /// Returns a future that resolves to a [`PathStateReceiver`] for the new connection. /// - /// The returned future is `'static`, so it can be stored without being liftetime-bound to `&self`. + /// The returned future is `'static`, so it can be stored without being lifetime-bound to `&self`. pub(crate) fn register_connection( &self, remote: EndpointId, conn: WeakConnectionHandle, - ) -> impl Future> + Send + 'static + ) -> impl Future> + Send + 'static { let (tx, rx) = oneshot::channel(); let sender = self.actor_sender.clone(); @@ -1377,7 +1377,7 @@ enum ActorMessage { AddConnection( EndpointId, WeakConnectionHandle, - oneshot::Sender, + oneshot::Sender, ), /// Re-evaluate direct addresses, e.g. after configured external addresses changed. DirectAddrRefresh, @@ -2206,9 +2206,9 @@ mod tests { let stats = conn.stats(); info!("stats: {:#?}", stats); if matches!(loss, ExpectedLoss::AlmostNone) { - for info in conn.paths().get().iter() { + for info in conn.paths().iter() { assert!( - info.stats().unwrap().lost_packets < 10, + info.stats().lost_packets < 10, "[receiver] path {:?} should not loose many packets", info.remote_addr() ); @@ -2260,9 +2260,9 @@ mod tests { let stats = conn.stats(); info!("stats: {:#?}", stats); if matches!(loss, ExpectedLoss::AlmostNone) { - for info in conn.paths().get().iter() { + for info in conn.paths().iter() { assert!( - info.stats().unwrap().lost_packets < 10, + info.stats().lost_packets < 10, "[sender] path {:?} should not loose many packets", info.remote_addr() ); diff --git a/iroh/src/socket/remote_map.rs b/iroh/src/socket/remote_map.rs index c28ebbfce4e..462c36e1c0d 100644 --- a/iroh/src/socket/remote_map.rs +++ b/iroh/src/socket/remote_map.rs @@ -12,12 +12,12 @@ use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; use tracing::{Span, debug, error}; -pub(crate) use self::remote_state::PathWatchable; +pub(crate) use self::remote_state::PathStateReceiver; use self::remote_state::RemoteStateActor; pub(super) use self::remote_state::RemoteStateMessage; pub use self::remote_state::{ - PathInfo, PathInfoList, PathInfoListIter, PathWatcher, RemoteInfo, TransportAddrInfo, - TransportAddrUsage, + Path, PathEvent, PathEventStream, PathList, PathListIter, PathListStream, RemoteInfo, + TransportAddrInfo, TransportAddrUsage, }; use super::{ DirectAddr, Metrics as SocketMetrics, @@ -241,7 +241,7 @@ impl RemoteMap { &mut self, remote: EndpointId, conn: noq::WeakConnectionHandle, - tx: oneshot::Sender, + tx: oneshot::Sender, ) -> Result<(), RemoteStateActorStoppedError> { let actor = self.remote_state_actor(remote); actor diff --git a/iroh/src/socket/remote_map/remote_state.rs b/iroh/src/socket/remote_map/remote_state.rs index c362008ecf0..cc7774e8532 100644 --- a/iroh/src/socket/remote_map/remote_state.rs +++ b/iroh/src/socket/remote_map/remote_state.rs @@ -14,18 +14,18 @@ use n0_future::{ task::JoinSet, time::{self, Duration, Instant}, }; -use n0_watcher::{Watchable, Watcher}; -use noq::{ConnectionError, WeakConnectionHandle}; -use noq_proto::{PathError, PathEvent, PathId, n0_nat_traversal}; +use n0_watcher::Watcher; +use noq::{Closed, WeakConnectionHandle}; +use noq_proto::{PathError, PathEvent as NoqPathEvent, PathId, n0_nat_traversal}; use rustc_hash::FxHashMap; use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; use tracing::{Instrument, Level, Span, debug, error, event, info_span, instrument, trace, warn}; use self::path_state::RemotePathState; -pub(crate) use self::path_watcher::PathWatchable; +pub(crate) use self::path_watcher::{PathStateReceiver, PathStateSender}; pub use self::{ - path_watcher::{PathInfo, PathInfoList, PathInfoListIter, PathWatcher}, + path_watcher::{Path, PathEvent, PathEventStream, PathList, PathListIter, PathListStream}, remote_info::{RemoteInfo, TransportAddrInfo, TransportAddrUsage}, }; use super::Source; @@ -81,7 +81,7 @@ const RTT_SWITCHING_MIN_IP: Duration = Duration::from_millis(5); /// The connection is identified using [`ConnId`]. The event `Err` variant happens when the /// actor has lagged processing the events, which is rather critical for us. type PathEvents = MergeUnbounded< - Pin)> + Send + Sync>>, + Pin)> + Send + Sync>>, >; /// A stream of events of announced NAT traversal candidate addresses for all connections. @@ -147,7 +147,7 @@ pub(super) struct RemoteStateActor { /// holepunching regularly. /// /// We only select a path once the path is functional in Noq. - selected_path: Watchable>, + selected_path: Option, /// Time at which we should schedule the next holepunch attempt. scheduled_holepunch: Option, /// When to next attempt opening paths in [`Self::pending_open_paths`]. @@ -283,8 +283,8 @@ impl RemoteStateActor { trace!(?id, ?evt, "remote addrs updated, triggering holepunching"); self.trigger_holepunching(); } - Some((conn_id, reason)) = self.connections_close.next(), if !self.connections_close.is_empty() => { - self.handle_connection_close(conn_id, reason); + Some((conn_id, closed)) = self.connections_close.next(), if !self.connections_close.is_empty() => { + self.handle_connection_close(conn_id, closed); } res = self.local_direct_addrs.updated() => { if let Err(n0_watcher::Disconnected) = res { @@ -378,7 +378,7 @@ impl RemoteStateActor { // though we might not have a relay transport or ip-capable transport set up. // So these errors must not be fatal for this actor (or even this operation). - if let Some(addr) = self.selected_path.get() { + if let Some(addr) = self.selected_path.as_ref() { trace!(?addr, "sending datagram to selected path"); if let Err(err) = send_datagram(&mut sender, addr.clone(), transmit).await { @@ -422,9 +422,9 @@ impl RemoteStateActor { fn handle_msg_add_connection( &mut self, handle: WeakConnectionHandle, - tx: oneshot::Sender, + tx: oneshot::Sender, ) { - let path_watchable = PathWatchable::new(self.selected_path.clone()); + let (path_state_sender, path_state_receiver) = PathStateSender::new(); if let Some(conn) = handle.upgrade() { self.metrics.num_conns_opened.inc(); // Remove any conflicting stable_ids from the local state. @@ -454,7 +454,7 @@ impl RemoteStateActor { .entry(conn_id) .insert_entry(ConnectionState { handle: handle.clone(), - path_watchable: path_watchable.clone(), + path_state: path_state_sender, paths: Default::default(), has_been_direct: false, }) @@ -506,8 +506,7 @@ impl RemoteStateActor { } self.trigger_holepunching(); } - - tx.send(path_watchable).ok(); + tx.send(path_state_receiver).ok(); } /// Handles [`RemoteStateMessage::ResolveRemote`]. @@ -544,21 +543,22 @@ impl RemoteStateActor { } } - fn handle_connection_close(&mut self, conn_id: ConnId, reason: ConnectionError) { + fn handle_connection_close(&mut self, conn_id: ConnId, closed: Closed) { event!( target: "iroh::_events::conn::closed", Level::DEBUG, %conn_id, remote_id = %self.endpoint_id.fmt_short(), - ?reason, + reason=?closed.reason, ); - if self.connections.remove(&conn_id).is_some() { + if let Some(conn_state) = self.connections.remove(&conn_id) { self.metrics.num_conns_closed.inc(); + conn_state.path_state.close(closed); } if self.connections.is_empty() { trace!("last connection closed - clearing selected_path"); - self.selected_path.set(None).ok(); + self.selected_path = None; } } @@ -567,7 +567,7 @@ impl RemoteStateActor { /// Does not start Address Lookup if we have a selected path or if Address Lookup is /// currently running. fn trigger_address_lookup(&mut self) { - if self.selected_path.get().is_some() || self.address_lookup_stream.is_some() { + if self.selected_path.is_some() || self.address_lookup_stream.is_some() { return; } let stream = self.address_lookup.resolve(self.endpoint_id); @@ -890,7 +890,7 @@ impl RemoteStateActor { } #[instrument(skip(self))] - fn handle_path_event(&mut self, conn_id: ConnId, event: Result) { + fn handle_path_event(&mut self, conn_id: ConnId, event: Result) { let Ok(event) = event else { warn!("missed a PathEvent, RemoteStateActor lagging"); // TODO: Is it possible to recover using the sync APIs to figure out what the @@ -907,7 +907,7 @@ impl RemoteStateActor { }; trace!("path event"); match event { - PathEvent::Opened { id: path_id } => { + NoqPathEvent::Opened { id: path_id } => { let Some(path) = conn.path(path_id) else { trace!("path open event for unknown path"); return; @@ -936,9 +936,9 @@ impl RemoteStateActor { self.select_path(); } - PathEvent::Abandoned { id, .. } => { + NoqPathEvent::Abandoned { id, reason } => { // Remove abandoned path from the conn state. - let Some(path_remote) = conn_state.remove_path(&id) else { + let Some(path_remote) = conn_state.remove_path(&id, &conn) else { debug!(%id, "path not in path_id_map"); return; }; @@ -952,6 +952,7 @@ impl RemoteStateActor { ?path_remote, %conn_id, path_id = ?id, + ?reason ); // If one connection abandons a path, close it on all connections. @@ -981,10 +982,10 @@ impl RemoteStateActor { // If the remote closed our selected path, select a new one. self.select_path(); } - PathEvent::Discarded { id, path_stats } => { + NoqPathEvent::Discarded { id, path_stats } => { trace!(%id, ?path_stats, "path discarded"); } - PathEvent::RemoteStatus { .. } | PathEvent::ObservedAddr { .. } => { + NoqPathEvent::RemoteStatus { .. } | NoqPathEvent::ObservedAddr { .. } => { // Nothing to do for these events. } } @@ -1027,13 +1028,13 @@ impl RemoteStateActor { }) .collect(); - let current_path = self.selected_path.get(); - let selected_path = select_best_path(path_rtts, ¤t_path); + let current_path = self.selected_path.as_ref(); + let selected_path = select_best_path(path_rtts, current_path); // Apply our new path if let Some((addr, rtt)) = selected_path { - let prev = self.selected_path.set(Some(addr.clone())); - if prev.is_ok() { + let prev = self.selected_path.replace(addr.clone()); + if prev.as_ref() != Some(&addr) { event!( target: "iroh::_events::path::selected", Level::DEBUG, @@ -1045,6 +1046,9 @@ impl RemoteStateActor { } self.open_path(&addr); self.close_redundant_paths(&addr); + for conn in self.connections.values() { + conn.path_state.record_selected(addr.clone().into()); + } } else { trace!(?current_path, "keeping current path"); } @@ -1059,7 +1063,7 @@ impl RemoteStateActor { /// avoid the client and server selecting different paths and accidentally closing all /// paths. fn close_redundant_paths(&mut self, selected_path: &transports::Addr) { - debug_assert_eq!(self.selected_path.get().as_ref(), Some(selected_path)); + debug_assert_eq!(self.selected_path.as_ref(), Some(selected_path)); for (conn_id, conn_state) in self.connections.iter() { for (path_id, path_remote) in conn_state @@ -1139,7 +1143,7 @@ impl RemoteStateActor { /// continued to be used. fn select_best_path( all_paths: FxHashMap, - current_path: &Option, + current_path: Option<&transports::Addr>, ) -> Option<(transports::Addr, Duration)> { // Determine the best new path according to sort_key. // If there is no path, return None. @@ -1202,7 +1206,7 @@ pub(crate) enum RemoteStateMessage { /// needed, any new paths discovered via holepunching will be added. And closed paths /// will be removed etc. #[debug("AddConnection(..)")] - AddConnection(WeakConnectionHandle, oneshot::Sender), + AddConnection(WeakConnectionHandle, oneshot::Sender), /// Asks if there is any possible path that could be used. /// /// This adds the provided transport addresses to the list of potential paths for this @@ -1258,8 +1262,12 @@ struct ConnId(usize); struct ConnectionState { /// Weak handle to the connection. handle: WeakConnectionHandle, - /// The information we publish to users about the paths used in this connection. - path_watchable: PathWatchable, + /// Writer-side handle for the connection's path observation state. + /// + /// The matching [`PathStateReceiver`] is held by the [`Connection`]. + /// + /// [`Connection`]: crate::endpoint::Connection + path_state: PathStateSender, /// The open paths that exist on this connection. paths: FxHashMap, /// Whether this connection has ever had a direct path. @@ -1268,12 +1276,6 @@ struct ConnectionState { has_been_direct: bool, } -impl Drop for ConnectionState { - fn drop(&mut self) { - self.path_watchable.close(); - } -} - impl ConnectionState { /// Tracks an open path for the connection. fn add_open_path( @@ -1293,16 +1295,23 @@ impl ConnectionState { } self.paths.insert(path_id, remote.clone()); - if let Some(conn) = self.handle.upgrade() { - self.path_watchable.insert(&conn, path_id, remote.into()); + if let Some(conn) = self.handle.upgrade() + && let Some(path) = conn.path(path_id) + { + let handle = path.weak_handle(); + self.path_state.record_opened(handle, remote.into()); } } /// Removes a path from this connection. - fn remove_path(&mut self, path_id: &PathId) -> Option { - let addr = self.paths.remove(path_id); - self.path_watchable.set_abandoned(*path_id); - addr + fn remove_path( + &mut self, + path_id: &PathId, + conn: &noq::Connection, + ) -> Option { + let addr = self.paths.remove(path_id)?; + self.path_state.record_abandoned(*path_id, conn); + Some(addr) } } @@ -1334,11 +1343,11 @@ impl OnClosed { } impl Future for OnClosed { - type Output = (ConnId, ConnectionError); + type Output = (ConnId, Closed); fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { let closed = std::task::ready!(Pin::new(&mut self.inner).poll(cx)); - Poll::Ready((self.conn_id, closed.reason)) + Poll::Ready((self.conn_id, closed)) } } @@ -1421,7 +1430,7 @@ mod tests { paths.insert(v4(1), psd(TransportType::Primary, 10)); paths.insert(v6(1), psd_v6(TransportType::Primary, 10)); - let result = select_best_path(paths, &None); + let result = select_best_path(paths, None); assert!(result.is_some()); let (addr, _) = result.unwrap(); assert!(matches!(addr, transports::Addr::Ip(SocketAddr::V6(_)))); @@ -1431,7 +1440,7 @@ mod tests { paths.insert(v4(1), psd(TransportType::Primary, 10)); paths.insert(v6(1), psd_v6(TransportType::Primary, 12)); // 2ms slower, but 3ms bias - let result = select_best_path(paths, &None); + let result = select_best_path(paths, None); assert!(result.is_some()); let (addr, _) = result.unwrap(); assert!(matches!(addr, transports::Addr::Ip(SocketAddr::V6(_)))); @@ -1441,7 +1450,7 @@ mod tests { paths.insert(v4(1), psd(TransportType::Primary, 10)); paths.insert(v6(1), psd_v6(TransportType::Primary, 20)); // 10ms slower, exceeds 3ms bias - let result = select_best_path(paths, &None); + let result = select_best_path(paths, None); assert!(result.is_some()); let (addr, _) = result.unwrap(); assert!(matches!(addr, transports::Addr::Ip(SocketAddr::V4(_)))); @@ -1454,7 +1463,7 @@ mod tests { paths.insert(v4(1), psd(TransportType::Primary, 100)); // High RTT but Available paths.insert(relay(1), psd(TransportType::Backup, 10)); // Low RTT but Backup - let result = select_best_path(paths, &None); + let result = select_best_path(paths, None); assert!(result.is_some()); let (addr, _) = result.unwrap(); assert!(addr.is_ip()); @@ -1464,7 +1473,7 @@ mod tests { paths.insert(v4(1), psd(TransportType::Primary, 1000)); paths.insert(relay(1), psd(TransportType::Backup, 1)); - let result = select_best_path(paths, &None); + let result = select_best_path(paths, None); assert!(result.is_some()); let (addr, _) = result.unwrap(); assert!(addr.is_ip()); @@ -1479,7 +1488,7 @@ mod tests { paths.insert(v4(1), psd(TransportType::Primary, 20)); paths.insert(v4(2), psd(TransportType::Primary, 18)); - let result = select_best_path(paths, &Some(current.clone())); + let result = select_best_path(paths, Some(¤t)); assert!(result.is_none()); // Should keep current // Should NOT switch: new path is just under threshold (4ms < 5ms) @@ -1487,7 +1496,7 @@ mod tests { paths.insert(v4(1), psd(TransportType::Primary, 20)); paths.insert(v4(2), psd(TransportType::Primary, 16)); - let result = select_best_path(paths, &Some(current.clone())); + let result = select_best_path(paths, Some(¤t)); assert!(result.is_none()); // Should keep current // SHOULD switch: new path is exactly at threshold (5ms, condition is <=) @@ -1495,7 +1504,7 @@ mod tests { paths.insert(v4(1), psd(TransportType::Primary, 20)); paths.insert(v4(2), psd(TransportType::Primary, 15)); - let result = select_best_path(paths, &Some(current.clone())); + let result = select_best_path(paths, Some(¤t)); assert!(result.is_some()); let (addr, _) = result.unwrap(); assert_eq!(addr, v4(2)); @@ -1505,7 +1514,7 @@ mod tests { paths.insert(v4(1), psd(TransportType::Primary, 20)); paths.insert(v4(2), psd(TransportType::Primary, 14)); - let result = select_best_path(paths, &Some(current.clone())); + let result = select_best_path(paths, Some(¤t)); assert!(result.is_some()); let (addr, _) = result.unwrap(); assert_eq!(addr, v4(2)); @@ -1517,7 +1526,7 @@ mod tests { paths.insert(v4(1), psd(TransportType::Primary, 20)); paths.insert(v4(2), psd(TransportType::Primary, 10)); - let result = select_best_path(paths, &None); + let result = select_best_path(paths, None); assert!(result.is_some()); let (addr, _) = result.unwrap(); assert_eq!(addr, v4(2)); // Lower RTT wins @@ -1526,11 +1535,11 @@ mod tests { #[test] fn test_empty_paths_returns_none() { let paths: FxHashMap = FxHashMap::default(); - let result = select_best_path(paths, &None); + let result = select_best_path(paths, None); assert!(result.is_none()); let paths: FxHashMap = FxHashMap::default(); - let result = select_best_path(paths, &Some(v4(1))); + let result = select_best_path(paths, Some(&v4(1))); assert!(result.is_none()); } } diff --git a/iroh/src/socket/remote_map/remote_state/path_watcher.rs b/iroh/src/socket/remote_map/remote_state/path_watcher.rs index 7280137769e..e16ec689838 100644 --- a/iroh/src/socket/remote_map/remote_state/path_watcher.rs +++ b/iroh/src/socket/remote_map/remote_state/path_watcher.rs @@ -1,352 +1,513 @@ -use std::task::Poll; +//! Path observation for a [`Connection`]. +//! +//! [`Connection::paths`] returns a borrowed [`PathList`] with live +//! statistics. [`Connection::path_events`] returns a `'static` stream +//! of [`PathEvent`]s. Subscribing to the event stream before reading +//! the snapshot ensures any change that happens between the read and +//! the next poll is observed by the subscriber. +//! +//! Closed paths are not retained in [`PathList`]; their final +//! statistics arrive inline on [`PathEvent::Closed`]. +//! +//! # Internal structure +//! +//! [`PathStateSender`] (owned by the [`RemoteStateActor`]) and +//! [`PathStateReceiver`] (held by the [`Connection`]) share a +//! [`Mutex`]`<`[`State`]`>`, a [`Notify`], and a [`broadcast`] channel. +//! The receiver holds a [`WeakSender`]; when the actor drops the +//! sender, outstanding event streams end. +//! +//! [`Connection`]: crate::endpoint::Connection +//! [`Connection::paths`]: crate::endpoint::Connection::paths +//! [`Connection::path_events`]: crate::endpoint::Connection::path_events +//! [`RemoteStateActor`]: super::RemoteStateActor +//! [`WeakSender`]: broadcast::WeakSender + +use std::{ + pin::Pin, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; use iroh_base::TransportAddr; -use n0_future::time::Duration; -use n0_watcher::{Watchable, Watcher}; +use n0_future::{StreamExt, time::Duration}; use noq::WeakPathHandle; use noq_proto::PathId; use smallvec::SmallVec; +use tokio::sync::{Notify, broadcast, futures::Notified}; +use tokio_stream::{ + Stream, + wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}, +}; -use crate::{endpoint::PathStats, socket::transports}; +use crate::endpoint::PathStats; -/// List of [`PathInfo`] for the network paths of a [`Connection`]. -/// -/// This struct implements [`IntoIterator`]. -/// -/// [`Connection`]: crate::endpoint::Connection -#[derive(Debug, Clone, Eq, PartialEq, Default)] -pub struct PathInfoList(SmallVec<[PathInfo; 4]>); +/// Per-connection broadcast channel capacity for path events. +const BROADCAST_CAPACITY: usize = 8; -impl PathInfoList { - /// Returns an iterator over the path infos. - pub fn iter(&self) -> impl Iterator { - self.0.iter() - } +/// Lifecycle notifications for a transmission paths in a connection. +#[derive(Clone, Debug)] +#[non_exhaustive] +pub enum PathEvent { + /// A new network path was opened. + Opened { + /// Path identifier. + id: PathId, + /// Remote transport address. + remote_addr: TransportAddr, + }, + /// A network path was closed. + Closed { + /// Path identifier. + id: PathId, + /// Remote transport address. + remote_addr: TransportAddr, + /// Path statistics captured at close time. + last_stats: Box, + }, + /// The selected transmission path changed to a different open path. + Selected { + /// Path identifier of the newly selected path. + id: PathId, + /// Remote transport address of the newly selected path. + remote_addr: TransportAddr, + }, + /// Events were dropped before the subscriber received them. + /// + /// Yielded when the subscriber does not poll the stream fast + /// enough to keep up with the writer. The current set of open + /// paths and the selected path remain accessible via + /// [`Connection::paths`]. + /// + /// [`Connection::paths`]: crate::endpoint::Connection::paths + Lagged { + /// Number of events dropped since the last delivered event. + missed: u64, + }, +} - /// Returns `true` if the list is empty. - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } +#[derive(Clone, derive_more::Debug)] +#[debug("PathData({}, {})", self.handle.id(), self.remote_addr)] +struct PathData { + handle: WeakPathHandle, + remote_addr: TransportAddr, +} - /// Returns the number of paths. - pub fn len(&self) -> usize { - self.0.len() - } +#[derive(Default, Debug, Clone)] +struct State { + list: SmallVec<[PathData; 4]>, + selected: Option, + closed: bool, } -/// Iterator returned from [`PathInfoList::into_iter`]. #[derive(Debug)] -pub struct PathInfoListIter(smallvec::IntoIter<[PathInfo; 4]>); +struct Shared { + state: Mutex, + notify: Notify, +} -impl IntoIterator for PathInfoList { - type Item = PathInfo; - type IntoIter = PathInfoListIter; +/// The writer-side handle for a connection's path state. +/// +/// Owned by the [`RemoteStateActor`]; the only handle that mutates +/// state and emits events. When dropped, every outstanding +/// [`PathEventStream`] ends. +/// +/// [`RemoteStateActor`]: super::RemoteStateActor +#[derive(Debug)] +pub(crate) struct PathStateSender { + shared: Arc, + events: broadcast::Sender, +} - fn into_iter(self) -> Self::IntoIter { - PathInfoListIter(self.0.into_iter()) +impl PathStateSender { + /// Creates a sender/receiver pair sharing empty state. + pub(crate) fn new() -> (Self, PathStateReceiver) { + let (events, _) = broadcast::channel(BROADCAST_CAPACITY); + let shared = Arc::new(Shared { + state: Default::default(), + notify: Notify::new(), + }); + let receiver = PathStateReceiver { + shared: shared.clone(), + events: events.downgrade(), + }; + let sender = PathStateSender { shared, events }; + (sender, receiver) } -} -impl IntoIterator for PathWatcher { - type Item = PathInfo; - type IntoIter = PathInfoListIter; + /// Records a newly-opened path and emits [`PathEvent::Opened`]. + pub(crate) fn record_opened(&self, handle: WeakPathHandle, remote_addr: TransportAddr) { + let id = handle.id(); + { + let mut state = self.shared.state.lock().expect("poisoned"); + let entry = PathData { + handle, + remote_addr: remote_addr.clone(), + }; + match state.list.iter().position(|e| e.handle.id() == id) { + Some(idx) => state.list[idx] = entry, + None => state.list.push(entry), + } + } + self.shared.notify.notify_waiters(); + let _ = self.events.send(PathEvent::Opened { id, remote_addr }); + } - fn into_iter(mut self) -> Self::IntoIter { - self.get().into_iter() + /// Records that a path was abandoned by `noq`. + pub(crate) fn record_abandoned(&self, id: PathId, conn: &noq::Connection) { + let removed = { + let mut state = self.shared.state.lock().expect("poisoned"); + if state.selected == Some(id) { + state.selected = None; + } + state + .list + .iter() + .position(|e| e.handle.id() == id) + .map(|pos| state.list.remove(pos)) + }; + if let Some(data) = removed { + let stats = conn + .path_stats(data.handle.id()) + .expect("Holding a WeakPathHandle makes Connection::path_stats infallible"); + self.shared.notify.notify_waiters(); + let _ = self.events.send(PathEvent::Closed { + id, + remote_addr: data.remote_addr, + last_stats: Box::new(stats), + }); + } } -} -impl Iterator for PathInfoListIter { - type Item = PathInfo; + /// Updates the selected transmission path. + pub(crate) fn record_selected(&self, remote_addr: TransportAddr) { + let changed = { + let mut state = self.shared.state.lock().expect("poisoned"); + let selected_path_id = state + .list + .iter() + .find(|p| p.remote_addr == remote_addr) + .map(|p| p.handle.id()); + if selected_path_id != state.selected { + state.selected = selected_path_id; + selected_path_id.map(|path_id| (path_id, remote_addr)) + } else { + None + } + }; + if let Some((id, remote_addr)) = changed { + let _ = self.events.send(PathEvent::Selected { id, remote_addr }); + self.shared.notify.notify_waiters(); + } + } - fn next(&mut self) -> Option { - self.0.next() + /// Closes the writer side of the path observation. + /// + /// Emits a final [`PathEvent::Closed`] for every remaining open + /// path with its statistics taken from `closed.path_stats`, marks + /// the state closed, and drops the sender. No-op if already closed. + /// + /// # Panics + /// + /// Panics if `closed.path_stats` is missing an entry for a path + /// still tracked by `self`. `noq` retains stats for every path + /// with a live [`WeakPathHandle`], and `self` holds one per entry, + /// so the entry must be present. + /// + /// [`WeakPathHandle`]: noq::WeakPathHandle + pub(crate) fn close(self, closed: noq::Closed) { + let mut state = self.shared.state.lock().expect("poisoned"); + if !state.closed { + for entry in state.list.iter() { + let stats = closed + .path_stats + .iter() + .find(|(id, _stats)| *id == entry.handle.id()) + .map(|(_id, stats)| stats) + .expect("noq::Closed contains stats for every path with a live handle"); + let _ = self.events.send(PathEvent::Closed { + id: entry.handle.id(), + remote_addr: entry.remote_addr.clone(), + last_stats: Box::new(*stats), + }); + } + state.closed = true; + self.shared.notify.notify_waiters(); + } } } -/// The value watched by the [`PathWatchable`]. -/// -/// A list of paths, and a `closed` flag to indicate when the connection closed so that -/// the watcher can be stopped. -#[derive(Clone, Debug, Eq, PartialEq)] -struct CloseablePathList { - /// The list of network paths. - paths: PathInfoList, - /// Set to `true` before the `RemoteStateActor` drops the `PathWatchable`. - /// - /// Afterwards, no further updates will be received. - closed: bool, +impl Drop for PathStateSender { + fn drop(&mut self) { + let mut state = self.shared.state.lock().expect("poisoned"); + if !state.closed { + state.closed = true; + self.shared.notify.notify_waiters(); + } + } } -/// Watcher for the open paths and selected network path in a connection. +/// The reader-side handle for a connection's path state. /// -/// See [`Connection::paths`] for details. +/// Held by a [`Connection`]. Cheap to clone. /// -/// [`Connection::paths`]: crate::endpoint::Connection::paths +/// [`Connection`]: crate::endpoint::Connection #[derive(Clone, Debug)] -pub struct PathWatcher { - paths_watcher: n0_watcher::Direct, - selected_path_watcher: n0_watcher::Direct>, - current_paths: PathInfoList, - current_selected_path: Option, +pub(crate) struct PathStateReceiver { + shared: Arc, + events: broadcast::WeakSender, } -impl PathWatcher { - /// Update the selected path from [`Self::selected_path_watcher`]. - /// - /// This sets [`Self::current_selected_path`] to the current value from - /// [`Self::selected_path_watcher`], but only if the latter is non-empty. +impl PathStateReceiver { + /// Returns a snapshot of the currently-open paths, tied to `conn`. + pub(crate) fn get<'a>(&self, conn: &'a noq::Connection) -> PathList<'a> { + PathList { + snapshot: self.shared.state.lock().expect("poisoned").clone(), + conn, + } + } + + /// Returns a stream of [`PathEvent`]s. /// - /// It also updates the [`PathInfo::is_selected`] field for all - /// current paths. - fn update_selected(&mut self) { - if let Some(path) = self.selected_path_watcher.peek() - && Some(path) != self.current_selected_path.as_ref() - { - self.current_selected_path = Some(path.clone()); + /// Already closed if the sender has been dropped. + pub(crate) fn events(&self) -> PathEventStream { + let receiver = if let Some(sender) = self.events.upgrade() { + sender.subscribe() + } else { + let (_tx, rx) = broadcast::channel(1); + rx + }; + PathEventStream { + inner: BroadcastStream::new(receiver), } + } - if let Some(selected_path) = self.current_selected_path.as_ref() { - for p in self.current_paths.0.iter_mut() { - p.is_selected = selected_path == p.remote_addr(); - } + /// Returns a stream of [`PathList`] snapshots tied to `conn`. + /// + /// Yields the current snapshot on the first poll, then a fresh + /// snapshot on every state change. Ends when the state is marked + /// closed. + pub(crate) fn updates<'a>(&'a self, conn: &'a noq::Connection) -> PathListStream<'a> { + PathListStream { + shared: &self.shared, + conn, + notified: Box::pin(self.shared.notify.notified()), + first_poll: true, } } } -impl Watcher for PathWatcher { - type Value = PathInfoList; - - fn update(&mut self) -> bool { - let mut updated = false; +/// A borrowed snapshot of a connection's currently-open paths. +/// +/// Returned by [`Connection::paths`]. The list is captured at call +/// time and does not reflect later changes. Closed paths are not +/// retained; to track per-path totals over the connection's lifetime, +/// accumulate from [`PathEvent::Closed`]. +/// +/// [`Connection::paths`]: crate::endpoint::Connection::paths +#[derive(Clone, derive_more::Debug)] +pub struct PathList<'conn> { + snapshot: State, + #[debug(skip)] + conn: &'conn noq::Connection, +} - if self.paths_watcher.update() { - updated = true; - self.current_paths = self.paths_watcher.peek().paths.clone(); - } +impl<'conn> PathList<'conn> { + /// Returns the number of open paths. + pub fn len(&self) -> usize { + self.snapshot.list.len() + } - if self.selected_path_watcher.update() { - // `Self::current_selected_path` is set in `Self::update_selected` below. - updated = true; - } + /// Returns `true` if no paths are open. + pub fn is_empty(&self) -> bool { + self.snapshot.list.is_empty() + } - if updated { - self.update_selected(); + /// Returns an iterator over the open paths. + pub fn iter(&self) -> PathListIter<'_> { + PathListIter { + inner: self.snapshot.list.iter(), + selected: self.snapshot.selected, + conn: self.conn, } + } - updated + /// Returns the path that was selected when this snapshot was taken. + /// + /// Returns `None` if no path was selected at that moment. + pub fn selected(&self) -> Option> { + self.get(self.snapshot.selected?) } - fn peek(&self) -> &Self::Value { - &self.current_paths + /// Returns the path with the given [`PathId`]. + /// + /// Returns `None` if no open path with that id is present in + /// this snapshot. + pub fn get(&self, id: PathId) -> Option> { + self.iter().find(|p| p.id() == id) } +} - fn is_connected(&self) -> bool { - self.paths_watcher.is_connected() - && self.selected_path_watcher.is_connected() - && !self.paths_watcher.peek().closed +impl<'a> IntoIterator for &'a PathList<'a> { + type IntoIter = PathListIter<'a>; + type Item = Path<'a>; + fn into_iter(self) -> Self::IntoIter { + self.iter() } +} + +/// An iterator over the open paths in a [`PathList`] snapshot. +#[derive(Debug)] +pub struct PathListIter<'a> { + inner: std::slice::Iter<'a, PathData>, + selected: Option, + conn: &'a noq::Connection, +} - fn poll_updated( - &mut self, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - // When the `closed` flag is set on the watched value, we return `Disconnected` - // to end the watcher update stream. We can't rely on the watchable being dropped, - // because the watchable is cloned into the `Connection` and thus will stay alive - // until the last clone of a connection is dropped. However, we want the watcher - // to end once the connection closes. Therefore we use a manual signal here instead. - if self.paths_watcher.peek().closed { - return Poll::Ready(Err(n0_watcher::Disconnected)); +impl<'a> PathListIter<'a> { + fn item(&self, data: &'a PathData) -> Path<'a> { + Path { + data, + conn: self.conn, + is_selected: self.selected == Some(data.handle.id()), } + } +} - let mut is_ready = false; +impl<'a> Iterator for PathListIter<'a> { + type Item = Path<'a>; - if self.paths_watcher.poll_updated(cx)?.is_ready() { - self.current_paths = self.paths_watcher.peek().paths.clone(); - is_ready = true; - } + fn next(&mut self) -> Option { + self.inner.next().map(|d| self.item(d)) + } - if self.selected_path_watcher.poll_updated(cx)?.is_ready() { - // `Self::current_selected_path` is set in `Self::update_selected` below. - is_ready = true; - } + fn size_hint(&self) -> (usize, Option) { + self.inner.size_hint() + } +} - if is_ready { - self.update_selected(); - Poll::Ready(Ok(())) - } else { - Poll::Pending - } +impl<'a> DoubleEndedIterator for PathListIter<'a> { + fn next_back(&mut self) -> Option { + self.inner.next_back().map(|d| self.item(d)) } } -/// Information about a network path used by a [`Connection`]. +impl ExactSizeIterator for PathListIter<'_> {} + +/// A single path within a [`PathList`] snapshot. +/// +/// Borrows from the enclosing [`PathList`] and from the [`Connection`] +/// that produced it, so a [`Path`] cannot cross a task boundary. +/// Clone [`remote_addr`](Self::remote_addr) or read +/// [`stats`](Self::stats) into an owned value first. /// /// [`Connection`]: crate::endpoint::Connection -#[derive(derive_more::Debug, Clone, Eq, PartialEq)] -pub struct PathInfo { - #[debug("{}", path.id())] - path: WeakPathHandle, - remote_addr: TransportAddr, - is_abandoned: bool, +#[derive(Clone, Debug)] +pub struct Path<'a> { + data: &'a PathData, + conn: &'a noq::Connection, is_selected: bool, } -impl PathInfo { - fn new(conn: &noq::Connection, id: PathId, remote_addr: TransportAddr) -> Option { - let path = conn.path(id)?; - Some(PathInfo { - path: path.weak_handle(), - remote_addr, - is_abandoned: path.status().is_err(), - is_selected: false, - }) - } - - /// Returns the [`PathId`] of this path. - /// - /// Path ids are unique-per-connection identifiers for a network path. A path id will never - /// be reused within a connection. +impl<'conn> Path<'conn> { + /// Returns the path's [`PathId`]. pub fn id(&self) -> PathId { - self.path.id() + self.data.handle.id() } - /// The remote transport address used by this network path. + /// Returns the path's remote transport address. pub fn remote_addr(&self) -> &TransportAddr { - &self.remote_addr + &self.data.remote_addr } - /// Returns `true` if this path is currently the main transmission path for this [`Connection`]. - /// - /// [`Connection`]: crate::endpoint::Connection + /// Returns `true` if this was the selected transmission path when + /// the enclosing [`PathList`] snapshot was taken. pub fn is_selected(&self) -> bool { self.is_selected } - /// Returns `true` if this path is closed. - /// - /// A path is considered closed as soon as the local endpoint has abandoned this path. - /// A closed path will remain closed forever, so once this returns `true` it will never - /// return `false` afterwards. If the network path becomes available again in the future, - /// a new path might be opened, but a closed path will never be reopened. - pub fn is_closed(&self) -> bool { - self.is_abandoned - } - - /// Whether this is an IP transport path. + /// Returns `true` if this is a direct IP path. pub fn is_ip(&self) -> bool { - self.remote_addr().is_ip() + self.data.remote_addr.is_ip() } - /// Whether this is a relay transport path. + /// Returns `true` if this is a relay path. pub fn is_relay(&self) -> bool { - self.remote_addr().is_relay() + self.data.remote_addr.is_relay() } - /// Returns stats for this network path. + /// Returns the path's statistics. /// - /// Returns `None` if the underlying connection has been dropped. - pub fn stats(&self) -> Option { - self.path.upgrade().map(|p| p.stats()) + /// Returns live statistics from the QUIC state for an open path, or + /// the final statistics retained by `noq` for a path that closed + /// after this snapshot was taken. + pub fn stats(&self) -> PathStats { + self.conn + .path_stats(self.data.handle.id()) + .expect("Holding a WeakPathHandle makes noq::Connection::path_stats infallible") } - /// Current best estimate of this paths's latency (round-trip-time). - /// - /// Returns `None` if the underlying connection has been dropped. - pub fn rtt(&self) -> Option { - self.stats().map(|s| s.rtt) + /// Returns the path's round-trip time estimate. + pub fn rtt(&self) -> Duration { + self.stats().rtt } } -/// Watchable for the network paths in a connection. +/// A stream of [`PathList`] snapshots for a connection. /// -/// This contains a watchable over a [`CloseablePathList`], and a watchable over the selected path for a remote. +/// Returned by [`Connection::path_updates`]. Yields the current +/// snapshot on the first poll and a fresh snapshot whenever the open +/// paths or the selected path change. Ends when the connection closes. /// -/// This struct is owned by the [`super::ConnectionState`] and also cloned into the [`Connection`]. -/// Most methods are `pub(super)`. The only method that may be called from [`Connection`] is -/// [`Self::watch`]. -/// -/// [`Connection`]: crate::endpoint::Connection -#[derive(Debug, Clone)] -pub(crate) struct PathWatchable { - paths: Watchable, - selected_path: Watchable>, +/// [`Connection::path_updates`]: crate::endpoint::Connection::path_updates +#[derive(Debug)] +pub struct PathListStream<'conn> { + shared: &'conn Shared, + conn: &'conn noq::Connection, + notified: Pin>>, + first_poll: bool, } -impl PathWatchable { - pub(super) fn new(selected_path: Watchable>) -> Self { - let value = CloseablePathList { - paths: Default::default(), - closed: false, - }; - Self { - paths: Watchable::new(value), - selected_path, - } - } - - /// Mark the path watchable as closed. - /// - /// Once called, watchers will not receive further updates. Must be called once the - /// [`super::ConnectionState`] that owns this [`PathWatchable`] is dropped. - /// - /// We can't rely on dropping the [`Watchable`] to close the watchers, because the - /// `Watchable` is cloned into the [`crate::endpoint::Connection`], and thus may stay - /// alive even after we dropped the [`super::ConnectionState`], which is the only place - /// that can update the [`PathWatchable]. - pub(super) fn close(&self) { - let mut value = self.paths.get(); - value.closed = true; - self.paths.set(value).ok(); - } +impl<'conn> Stream for PathListStream<'conn> { + type Item = PathList<'conn>; - /// Inserts a new path. - pub(super) fn insert(&self, conn: &noq::Connection, id: PathId, remote_addr: TransportAddr) { - if let Some(data) = PathInfo::new(conn, id, remote_addr) { - self.update(move |list| list.0.push(data)); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + if this.first_poll { + this.first_poll = false; + } else { + std::task::ready!(this.notified.as_mut().poll(cx)); + this.notified.set(this.shared.notify.notified()); } - } - - /// Marks a path as abandoned. - /// - /// If there are no watchers, the path will be removed from the watchable's value. - /// If there are watchers, the path will not be removed so that the watcher can still access the path's stats. - pub(super) fn set_abandoned(&self, id: PathId) { - self.update(|list| { - if let Some(item) = list.0.iter_mut().find(|p| p.path.id() == id) { - item.is_abandoned = true; - } - }); - } - - /// Updates the watchable's value through a closure. - /// - /// After the update is performed, and if there are currently no watchers, data for abandoned paths - /// is removed from the path list. - fn update(&self, f: impl FnOnce(&mut PathInfoList)) { - let mut value = self.paths.get(); - f(&mut value.paths); - // Remove abandoned paths from the list if we don't have watchers currently. - if !self.paths.has_watchers() { - value.paths.0.retain(|p| !p.is_abandoned); - value.paths.0.shrink_to_fit(); + this.notified.as_mut().enable(); + let snapshot = this.shared.state.lock().expect("poisoned").clone(); + if snapshot.closed { + Poll::Ready(None) + } else { + Poll::Ready(Some(PathList { + snapshot, + conn: this.conn, + })) } - self.paths.set(value).ok(); } +} - /// Returns a [`PathWatcher`] for this watchable. - pub(crate) fn watch(&self) -> PathWatcher { - let paths_watcher = self.paths.watch(); - let selected_path_watcher = self.selected_path.watch(); - let mut watcher = PathWatcher { - current_paths: paths_watcher.peek().paths.clone(), - // Set via `update_selected` below. - current_selected_path: None, - paths_watcher, - selected_path_watcher, - }; - watcher.update_selected(); - watcher +/// A `'static` stream of [`PathEvent`]s. +/// +/// Returned by [`Connection::path_events`]. +/// +/// [`Connection::path_events`]: crate::endpoint::Connection::path_events +#[derive(Debug)] +pub struct PathEventStream { + inner: BroadcastStream, +} + +impl Stream for PathEventStream { + type Item = PathEvent; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_next(cx).map(|event| match event? { + Ok(event) => Some(event), + Err(BroadcastStreamRecvError::Lagged(missed)) => Some(PathEvent::Lagged { missed }), + }) } } diff --git a/iroh/src/test_utils/test_transport.rs b/iroh/src/test_utils/test_transport.rs index 59a120c2512..0a7e196be03 100644 --- a/iroh/src/test_utils/test_transport.rs +++ b/iroh/src/test_utils/test_transport.rs @@ -324,7 +324,6 @@ mod tests { use iroh_relay::RelayMap; use n0_error::{Result, StdResultExt}; use n0_tracing_test::traced_test; - use n0_watcher::Watcher; use super::*; use crate::{ @@ -425,7 +424,7 @@ mod tests { /// Returns true if the selected path is the custom transport. fn is_custom_selected(conn: &crate::endpoint::Connection) -> bool { - let paths = conn.paths().get(); + let paths = conn.paths(); paths.iter().find(|p| p.is_selected()).is_some_and( |p| matches!(p.remote_addr(), TransportAddr::Custom(a) if a.id() == TEST_TRANSPORT_ID), ) @@ -435,7 +434,7 @@ mod tests { /// - we have both IP and custom paths, and the selected path is IP. /// - we only have one path fn is_ip_selected_from_ip_and_custom(conn: &crate::endpoint::Connection) -> bool { - let paths = conn.paths().get(); + let paths = conn.paths(); let has_ip = paths.iter().any(|p| p.remote_addr().is_ip()); let has_custom = paths.iter().any(|p| p.remote_addr().is_custom()); if !has_ip || !has_custom { @@ -448,7 +447,7 @@ mod tests { /// Returns true if the selected path is a relay transport. fn is_relay_selected(conn: &crate::endpoint::Connection) -> bool { - let paths = conn.paths().get(); + let paths = conn.paths(); paths .iter() .find(|p| p.is_selected()) @@ -489,7 +488,7 @@ mod tests { .await?; // Verify exactly one path exists and it's the custom transport - let paths = conn.paths().get(); + let paths = conn.paths(); assert_eq!(paths.len(), 1, "Expected exactly one path"); assert!( is_custom_selected(&conn), @@ -689,7 +688,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(200)).await; // Debug: print paths after relay-only connect - let paths = conn.paths().get(); + let paths = conn.paths(); eprintln!("Paths after relay-only connect:"); for path in paths.iter() { eprintln!( @@ -718,7 +717,7 @@ mod tests { tokio::time::sleep(Duration::from_millis(200)).await; // Debug: print all paths - let paths = conn.paths().get(); + let paths = conn.paths(); eprintln!("Paths after connecting with all addresses:"); for path in paths.iter() { eprintln!( diff --git a/iroh/tests/patchbay.rs b/iroh/tests/patchbay.rs index cd56bf03990..4ec1c1ffb21 100644 --- a/iroh/tests/patchbay.rs +++ b/iroh/tests/patchbay.rs @@ -36,7 +36,8 @@ use patchbay::{IfaceConfig, LinkCondition, LinkDirection, Nat}; use testdir::testdir; use tracing::info; -use self::util::{Pair, PathWatcherExt, lab_with_relay, ping_accept, ping_open}; +use self::util::{Pair, PathConnectionExt, lab_with_relay, ping_accept, ping_open}; +use crate::util::is_relayed; // Because we're in an integration test, we can't declare modules under patchbay/ // without setting an explicit path. @@ -77,12 +78,8 @@ async fn holepunch_simple() -> Result { Ok(()) }) .client(client, async move |_dev, _ep, conn| { - let mut paths = conn.paths(); - assert!(paths.selected().is_relay(), "connection started relayed"); - paths - .wait_ip(timeout) - .await - .context("holepunch to direct")?; + assert!(is_relayed(&conn), "connection started relayed"); + conn.wait_ip(timeout).await.context("holepunch to direct")?; info!("connection became direct"); Ok(()) }) @@ -120,13 +117,12 @@ async fn run_add_faster_link(active_side: Side) -> Result { let timeout = Duration::from_secs(15); Pair::new(relay_map) .left(active_side, active, async move |dev, _ep, conn| { - let mut paths = conn.paths(); - assert!(paths.selected().is_relay(), "connection started relayed"); - let first = paths + assert!(is_relayed(&conn), "connection started relayed"); + let first = conn .wait_ip(timeout) .await .context("did not become direct")?; - info!(addr=?first.remote_addr(), "connection became direct"); + info!(addr=?first, "connection became direct"); ping_accept(&conn, timeout) .await .context("ping_accept before switch")?; @@ -134,13 +130,11 @@ async fn run_add_faster_link(active_side: Side) -> Result { info!("bring up faster link (eth1)"); dev.iface("eth1").unwrap().link_up().await?; - let next = paths - .wait_selected(timeout, |p| { - p.is_ip() && p.remote_addr() != first.remote_addr() - }) + let next = conn + .wait_selected(timeout, |p| p.is_ip() && p.remote_addr() != &first) .await .context("did not switch paths")?; - info!(addr=?next.remote_addr(), "new direct path established"); + info!(addr=?next, "new direct path established"); ping_accept(&conn, timeout) .await .context("ping_accept after switch")?; @@ -149,24 +143,21 @@ async fn run_add_faster_link(active_side: Side) -> Result { Ok(()) }) .right(passive, async move |_dev, _ep, conn| { - let mut paths = conn.paths(); - assert!(paths.selected().is_relay(), "connection started relayed"); - let first = paths + assert!(is_relayed(&conn), "connection started relayed"); + let first = conn .wait_ip(timeout) .await .context("did not become direct")?; - info!(addr=?first.remote_addr(), "connection became direct"); + info!(addr=?first, "connection became direct"); ping_open(&conn, timeout) .await .context("ping_open before switch")?; - let next = paths - .wait_selected(timeout, |p| { - p.is_ip() && p.remote_addr() != first.remote_addr() - }) + let next = conn + .wait_selected(timeout, |p| p.is_ip() && p.remote_addr() != &first) .await .context("did not switch paths")?; - info!(addr=?next.remote_addr(), "new direct path established"); + info!(addr=?next, "new direct path established"); ping_open(&conn, timeout) .await .context("ping_open after switch")?; @@ -205,8 +196,7 @@ async fn run_link_outage_recovery(outage_side: Side, downtime: Duration) -> Resu let timeout = Duration::from_secs(15); Pair::new(relay_map) .left(outage_side, outage, async move |dev, _ep, conn| { - let mut paths = conn.paths(); - paths.wait_ip(timeout).await.context("initial holepunch")?; + conn.wait_ip(timeout).await.context("initial holepunch")?; info!("holepunched, now killing link for {downtime:?}"); dev.iface("eth0").unwrap().link_down().await?; tokio::time::sleep(downtime).await; @@ -218,8 +208,7 @@ async fn run_link_outage_recovery(outage_side: Side, downtime: Duration) -> Resu .context("ping_open after link_up")?; info!("connection recovered after link outage"); - paths - .wait_ip(timeout) + conn.wait_ip(timeout) .await .context("did not re-establish direct path")?; ping_open(&conn, timeout) @@ -278,8 +267,7 @@ async fn run_hard_nat_to_holepunchable(replug_side: Side) -> Result { let timeout = Duration::from_secs(15); Pair::new(relay_map) .left(replug_side, replug, async move |dev, _ep, conn| { - let mut paths = conn.paths(); - assert!(paths.selected().is_relay(), "connection started relayed"); + assert!(is_relayed(&conn), "connection started relayed"); ping_accept(&conn, timeout) .await @@ -287,15 +275,17 @@ async fn run_hard_nat_to_holepunchable(replug_side: Side) -> Result { tokio::time::sleep(Duration::from_secs(3)).await; assert!( - paths.selected().is_relay(), + conn.paths() + .selected() + .expect("no selected path") + .is_relay(), "should still be relayed behind symmetric NAT" ); info!("replug to holepunchable NAT"); dev.iface("eth0").unwrap().replug(nat_easy.id()).await?; - paths - .wait_ip(timeout) + conn.wait_ip(timeout) .await .context("did not become direct after replug")?; info!("connection became direct"); @@ -307,11 +297,9 @@ async fn run_hard_nat_to_holepunchable(replug_side: Side) -> Result { Ok(()) }) .right(stable, async move |_dev, _ep, conn| { - let mut paths = conn.paths(); - assert!(paths.selected().is_relay(), "connection started relayed"); + assert!(is_relayed(&conn), "connection started relayed"); ping_open(&conn, timeout).await.context("ping 1 (relay)")?; - paths - .wait_ip(timeout) + conn.wait_ip(timeout) .await .context("did not become direct after replug")?; info!("connection became direct"); @@ -356,10 +344,8 @@ async fn run_holepunch_many_addrs(many_addrs_side: Side, addr_count: u8) -> Resu let timeout = Duration::from_secs(15); Pair::new(relay_map) .left(many_addrs_side, many_addrs, async move |_dev, _ep, conn| { - let mut paths = conn.paths(); - assert!(paths.selected().is_relay(), "connection started relayed"); - paths - .wait_ip(timeout) + assert!(is_relayed(&conn), "connection started relayed"); + conn.wait_ip(timeout) .await .context("holepunch to direct with many addrs")?; info!("connection became direct"); @@ -368,10 +354,8 @@ async fn run_holepunch_many_addrs(many_addrs_side: Side, addr_count: u8) -> Resu Ok(()) }) .right(plain, async move |_dev, _ep, conn| { - let mut paths = conn.paths(); - assert!(paths.selected().is_relay(), "connection started relayed"); - paths - .wait_ip(timeout) + assert!(is_relayed(&conn), "connection started relayed"); + conn.wait_ip(timeout) .await .context("holepunch to direct with many addrs")?; info!("connection became direct"); diff --git a/iroh/tests/patchbay/degrade.rs b/iroh/tests/patchbay/degrade.rs index b4fcef7bf4e..74b597a5566 100644 --- a/iroh/tests/patchbay/degrade.rs +++ b/iroh/tests/patchbay/degrade.rs @@ -9,7 +9,7 @@ use patchbay::{LinkCondition, LinkDirection, LinkLimits, Nat}; use testdir::testdir; use tracing::info; -use super::util::{Pair, PathWatcherExt, lab_with_relay, ping_accept, ping_open}; +use super::util::{Pair, PathConnectionExt, lab_with_relay, ping_accept, ping_open}; /// Increasingly degraded link conditions applied to one side of the connection. /// @@ -123,12 +123,8 @@ async fn run_degrade_level(impaired_side: Side, level: usize) -> Result<()> { Ok(()) }) .client(client, async move |_dev, _ep, conn| { - let mut paths = conn.paths(); info!("waiting for connection to become direct"); - paths - .wait_ip(timeout) - .await - .context("holepunch to direct")?; + conn.wait_ip(timeout).await.context("holepunch to direct")?; info!("direct path established, sending ping"); ping_open(&conn, timeout).await.context("ping_open")?; info!("ping complete"); diff --git a/iroh/tests/patchbay/nat.rs b/iroh/tests/patchbay/nat.rs index b482a083501..f6690e30a1f 100644 --- a/iroh/tests/patchbay/nat.rs +++ b/iroh/tests/patchbay/nat.rs @@ -19,8 +19,8 @@ use patchbay::{Nat, NatConfig, NatFiltering, NatMapping}; use testdir::testdir; use tracing::info; -use super::util::{Pair, PathWatcherExt, lab_with_relay}; -use crate::util::{ping_accept, ping_open}; +use super::util::{Pair, PathConnectionExt, lab_with_relay}; +use crate::util::{is_relayed, ping_accept, ping_open}; enum NatKind { /// No NAT. The device has a publicly routable address. @@ -103,24 +103,16 @@ async fn run_nat_holepunch(nat_server: NatKind, nat_client: NatKind) -> Result { let timeout = Duration::from_secs(15); Pair::new(relay_map) .server(server, async move |_dev, _ep, conn| { - let mut paths = conn.paths(); - assert!(paths.selected().is_relay(), "connection started relayed"); - paths - .wait_ip(timeout) - .await - .context("holepunch to direct")?; + assert!(is_relayed(&conn), "connection started relayed"); + conn.wait_ip(timeout).await.context("holepunch to direct")?; info!("connection became direct"); ping_accept(&conn, timeout).await?; conn.closed().await; Ok(()) }) .client(client, async move |_dev, _ep, conn| { - let mut paths = conn.paths(); - assert!(paths.selected().is_relay(), "connection started relayed"); - paths - .wait_ip(timeout) - .await - .context("holepunch to direct")?; + assert!(is_relayed(&conn), "connection started relayed"); + conn.wait_ip(timeout).await.context("holepunch to direct")?; info!("connection became direct"); ping_open(&conn, timeout).await?; conn.close(0u32.into(), b"bye"); diff --git a/iroh/tests/patchbay/switch-uplink.rs b/iroh/tests/patchbay/switch-uplink.rs index 1b46235450b..77c3a5cadaf 100644 --- a/iroh/tests/patchbay/switch-uplink.rs +++ b/iroh/tests/patchbay/switch-uplink.rs @@ -13,14 +13,14 @@ use std::time::Duration; -use iroh::{TransportAddr, Watcher, endpoint::Side}; +use iroh::{TransportAddr, endpoint::Side}; use n0_error::{Result, StackResultExt}; use n0_tracing_test::traced_test; use patchbay::{IpSupport, RouterPreset}; use testdir::testdir; use tracing::info; -use crate::util::{Pair, PathWatcherExt, lab_with_relay, ping_accept, ping_open}; +use crate::util::{Pair, PathConnectionExt, lab_with_relay, ping_accept, ping_open}; /// Builds the lab topology and runs a single uplink switch test. /// @@ -70,8 +70,7 @@ async fn run_switch_uplink(switching_side: Side, from: IpSupport, to: IpSupport) Pair::new(relay_map) .left(switching_side, switcher, async move |dev, _ep, conn| { - let mut paths = conn.paths(); - paths.wait_ip(timeout).await.context("initial holepunch")?; + conn.wait_ip(timeout).await.context("initial holepunch")?; ping_accept(&conn, timeout) .await .context("ping_accept before switch")?; @@ -83,18 +82,16 @@ async fn run_switch_uplink(switching_side: Side, from: IpSupport, to: IpSupport) Ok(()) }) .right(observer, async move |_dev, _ep, conn| { - let mut paths = conn.paths(); - paths.wait_ip(timeout).await.context("initial holepunch")?; - let previous: Vec = paths - .get() + conn.wait_ip(timeout).await.context("initial holepunch")?; + let previous: Vec = conn + .paths() .iter() .map(|p| p.remote_addr().clone()) .collect(); ping_open(&conn, timeout) .await .context("ping_open before switch")?; - paths - .wait_selected(timeout, |p| path_switched(to, &previous, p.remote_addr())) + conn.wait_selected(timeout, |p| path_switched(to, &previous, p.remote_addr())) .await .context("path did not switch")?; ping_open(&conn, timeout) diff --git a/iroh/tests/patchbay/util.rs b/iroh/tests/patchbay/util.rs index a01ec865f6f..c678689202b 100644 --- a/iroh/tests/patchbay/util.rs +++ b/iroh/tests/patchbay/util.rs @@ -1,13 +1,13 @@ use std::{future::Future, path::PathBuf, sync::Arc, time::Duration}; use iroh::{ - Endpoint, EndpointAddr, RelayMap, RelayMode, Watcher, - endpoint::{Connection, PathInfo, PathWatcher, presets}, + Endpoint, EndpointAddr, RelayMap, RelayMode, TransportAddr, + endpoint::{Connection, Path, PathEvent, presets}, tls::CaRootsConfig, }; use iroh_metrics::MetricsGroupSet; use n0_error::{Result, StackResultExt, StdResultExt, anyerr, ensure_any}; -use n0_future::{boxed::BoxFuture, task::AbortOnDropHandle}; +use n0_future::{StreamExt, boxed::BoxFuture, task::AbortOnDropHandle}; use noq::Side; use patchbay::{Device, IpSupport, Lab, OutDir, TestGuard}; use tokio::sync::{Barrier, oneshot}; @@ -336,69 +336,54 @@ fn log_result_on_device(dev: &Device, res }); } -/// Extension methods on [`PathWatcher`] for common waiting patterns in tests. -#[allow(unused)] -pub trait PathWatcherExt { - /// Waits until the selected path fulfills a condition. - /// - /// Calls `f` with the currently-selected path, and again after each path update, - /// until `f` returns true or `timeout` elapses. - /// - /// Returns an error if the timeout elapses before `f` returned true. +/// Extension trait on [`Connection`] providing timeout-bounded wait helpers +/// on top of [`Connection::paths`] and [`PathList::stream`]. +pub trait PathConnectionExt { + /// Waits until the selected path satisfies `f`. Returns the matching + /// path's [`TransportAddr`]. async fn wait_selected( - &mut self, + &self, timeout: Duration, - f: impl Fn(&PathInfo) -> bool, - ) -> Result; - - /// Returns the currently selected path. - /// - /// Panics if no path is marked as selected. - fn selected(&mut self) -> PathInfo; + f: impl FnMut(&Path<'_>) -> bool, + ) -> Result; - /// Wait until the selected path is a direct (IP) path. - async fn wait_ip(&mut self, timeout: Duration) -> Result { - self.wait_selected(timeout, PathInfo::is_ip) + /// Waits until the selected path is a direct (IP) path. + async fn wait_ip(&self, timeout: Duration) -> Result { + self.wait_selected(timeout, |p| p.is_ip()) .await .context("wait_ip") } - - /// Wait until the selected path is a relay path. - async fn wait_relay(&mut self, timeout: Duration) -> Result { - self.wait_selected(timeout, PathInfo::is_relay) - .await - .context("wait_relay") - } } -impl PathWatcherExt for PathWatcher { - fn selected(&mut self) -> PathInfo { - let p = self.get(); - p.iter() - .find(|p| p.is_selected()) - .cloned() - .expect("no selected path") - } - +impl PathConnectionExt for Connection { async fn wait_selected( - &mut self, + &self, timeout: Duration, - f: impl Fn(&PathInfo) -> bool, - ) -> Result { + mut f: impl FnMut(&Path<'_>) -> bool, + ) -> Result { + let mut stream = self.path_updates(); tokio::time::timeout(timeout, async { - loop { - let selected = self.selected(); + while let Some(paths) = stream.next().await { + let selected = paths.selected().expect("no selected path"); if f(&selected) { - return n0_error::Ok(selected); + return Ok(selected.remote_addr().clone()); } - self.updated().await?; } + Err(anyerr!("path stream ended")) }) .await .with_std_context(|_| format!("wait_selected timed out after {timeout:?}"))? } } +/// Returns `true` if the currently selected path is a relay path. +pub fn is_relayed(conn: &iroh::endpoint::Connection) -> bool { + conn.paths() + .selected() + .expect("no selected path") + .is_relay() +} + /// Opens a bidi stream, sends 8 bytes of data, and waits to receive the same data back. pub async fn ping_open(conn: &Connection, timeout: Duration) -> Result { tokio::time::timeout(timeout, async { @@ -438,24 +423,15 @@ pub async fn ping_accept(conn: &Connection, timeout: Duration) -> Result { } fn watch_selected_path(conn: &Connection) { - let mut watcher = conn.paths(); + let mut events = conn.path_events(); + if let Some(path) = conn.paths().selected() { + debug!("selected path: [{}] {}", path.id(), path.remote_addr()); + } tokio::spawn( async move { - let mut prev = None; - loop { - let paths = watcher.get(); - let selected = paths.iter().find(|p| p.is_selected()).unwrap(); - if Some(selected) != prev.as_ref() { - debug!( - "selected path: [{}] {:?} rtt {:?}", - selected.id(), - selected.remote_addr(), - selected.rtt().unwrap() - ); - prev = Some(selected.clone()); - } - if watcher.updated().await.is_err() { - break; + while let Some(event) = events.next().await { + if let PathEvent::Selected { id, remote_addr } = event { + debug!("selected path: [{id}] {remote_addr}"); } } }