Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions iroh/examples/custom-transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use iroh::{
test_utils::test_transport::{TEST_TRANSPORT_ID, TestNetwork, TestTransport},
};
use n0_error::{Result, StdResultExt};
use n0_watcher::Watcher;

/// Each protocol is identified by its ALPN string.
///
Expand Down Expand Up @@ -124,7 +123,7 @@ async fn main() -> Result<()> {

// Helper to print paths and verify test transport is selected
let verify_test_transport = |label: &str| {
let paths = conn.paths().get();
let paths = conn.paths();
println!("Paths {}:", label);
for path in paths.iter() {
println!(
Expand All @@ -134,16 +133,16 @@ async fn main() -> Result<()> {
path.rtt()
);
}
let selected_path = paths.iter().find(|p| p.is_selected());
let is_test_transport = selected_path.is_some_and(|p| {
let selected_path = paths.selected();
let is_test_transport = selected_path.as_ref().is_some_and(|p| {
matches!(p.remote_addr(), TransportAddr::Custom(addr) if addr.id() == TEST_TRANSPORT_ID)
});
assert!(
is_test_transport,
"Expected test transport (id={}) to be selected {}, got: {:?}",
TEST_TRANSPORT_ID,
label,
selected_path.map(|p| p.remote_addr())
selected_path.as_ref().map(|p| p.remote_addr())
);
println!(
"Verified: test transport (id={}) is selected {}",
Expand Down
4 changes: 2 additions & 2 deletions iroh/examples/monitor-connections.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, time::Duration};

use iroh::{
Endpoint, Watcher,
Endpoint,
endpoint::{
AfterHandshakeOutcome, Closed, Connection, EndpointHooks, WeakConnectionHandle, presets,
},
Expand Down Expand Up @@ -133,7 +133,7 @@ impl Monitor {
Some(MonitoredConnection { alpn, remote_id, handle }) = rx.recv() => {
let alpn = String::from_utf8_lossy(&alpn).to_string();
let remote = remote_id.fmt_short();
let rtt = handle.upgrade().and_then(|c| c.paths().peek().iter().map(|p| p.stats().expect("conn is not dropped").rtt).min());
let rtt = handle.upgrade().and_then(|c| c.paths().iter().map(|p| p.rtt()).min());
info!(%remote, %alpn, ?rtt, "new connection");
tasks.spawn(async move {
match handle.closed().await {
Expand Down
140 changes: 65 additions & 75 deletions iroh/examples/remote-info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ fn log_active(remote_map: &RemoteMap) {
"[{}] is_active {}, connections {}, ip_path {:?}, relay_path {:?}, current_min_rtt {:?}",
id.fmt_short(),
info.is_active(),
info.connections().count(),
info.active_connections(),
info.has_ip_path(),
info.has_relay_path(),
info.current_min_rtt()
info.current_min_rtt(),
);
}
}
Expand All @@ -126,7 +126,7 @@ fn log_aggregate(remote_map: &RemoteMap) {
aggregate.relay_path,
SystemTime::now()
.duration_since(aggregate.last_update)
.unwrap()
.unwrap_or_default()
);
}
}
Expand Down Expand Up @@ -166,14 +166,13 @@ mod remote_map {
};

use iroh::{
EndpointId, Watcher,
EndpointId, TransportAddr,
endpoint::{
AfterHandshakeOutcome, Connection, EndpointHooks, PathInfo, WeakConnectionHandle,
AfterHandshakeOutcome, Connection, EndpointHooks, PathEvent, WeakConnectionHandle,
},
};
use n0_future::task::AbortOnDropHandle;
use n0_future::{StreamExt, task::AbortOnDropHandle};
use tokio::{sync::mpsc, task::JoinSet};
use tokio_stream::StreamExt;
use tracing::{Instrument, debug, info, info_span};

/// Information about a remote info.
Expand Down Expand Up @@ -211,19 +210,13 @@ mod remote_map {
}

impl Aggregate {
fn update(&mut self, path: &PathInfo) {
fn update(&mut self, addr: TransportAddr, stats: iroh::endpoint::PathStats) {
self.last_update = SystemTime::now();
if path.is_ip() {
self.ip_path = true;
}
if path.is_relay() {
self.relay_path = true;
}
if let Some(stats) = path.stats() {
debug!("path update addr {:?} {stats:?}", path.remote_addr());
self.rtt_min = self.rtt_min.min(stats.rtt);
self.rtt_max = self.rtt_max.max(stats.rtt);
}
self.ip_path |= addr.is_ip();
self.relay_path |= addr.is_relay();
debug!("path update {addr} {stats:?}");
self.rtt_min = self.rtt_min.min(stats.rtt);
self.rtt_max = self.rtt_max.max(stats.rtt);
}
}

Expand All @@ -239,46 +232,37 @@ mod remote_map {
///
/// Returns `None` if there are no active connections.
pub fn current_min_rtt(&self) -> Option<Duration> {
self.connections()
.flat_map(|c| c.upgrade())
.flat_map(|c| c.paths().get().into_iter())
.flat_map(|p| p.stats())
.map(|s| s.rtt)
self.upgraded()
.filter_map(|c| c.paths().iter().map(|p| p.rtt()).min())
.min()
}

/// Returns whether any active connection to the remote has an active IP path.
///
/// Returns `None` if there are no active connections.
pub fn has_ip_path(&self) -> Option<bool> {
self.connections()
.flat_map(|c| c.upgrade())
.flat_map(|c| c.paths().get())
.filter(|path| path.is_ip())
.map(|_| true)
.next()
pub fn has_ip_path(&self) -> bool {
self.upgraded().any(|c| c.paths().iter().any(|p| p.is_ip()))
}

/// Returns whether any active connection to the remote has an active relay path.
///
/// Returns `None` if there are no active connections.
pub fn has_relay_path(&self) -> Option<bool> {
self.connections()
.flat_map(|c| c.upgrade())
.flat_map(|c| c.paths().get())
.filter(|path| path.is_relay())
.map(|_| true)
.next()
pub fn has_relay_path(&self) -> bool {
self.upgraded()
.any(|c| c.paths().iter().any(|p| p.is_relay()))
}

/// Returns `true` if there are active connections to this node.
/// Returns `true` if there are active connections to this remote.
pub fn is_active(&self) -> bool {
!self.connections.is_empty()
self.active_connections() > 0
}

/// Returns an iterator over [`WeakConnectionHandle`] for currently active connections to this remote.
pub fn connections(&self) -> impl Iterator<Item = &WeakConnectionHandle> {
self.connections.values()
/// Returns the number of active connections to this remote.
pub fn active_connections(&self) -> usize {
self.upgraded().count()
}

/// Returns an iterator over all active handles upgraded to a [`Connection`].
fn upgraded(&self) -> impl Iterator<Item = Connection> {
self.connections
.values()
.filter_map(WeakConnectionHandle::upgrade)
}
}

Expand Down Expand Up @@ -368,18 +352,14 @@ mod remote_map {
// Main loop
loop {
tokio::select! {
conn = rx.recv() => {
match conn {
Some(conn) => {
conn_id += 1;
Self::on_connection(&mut tasks, map.clone(), conn_id, conn);
},
None => break,
}
Some(conn) = rx.recv() => {
conn_id += 1;
Self::on_connection(&mut tasks, map.clone(), conn_id, conn);
}
Some(res) = tasks.join_next(), if !tasks.is_empty() => {
res.expect("conn close task panicked");
}
else => break,
}
}

Expand Down Expand Up @@ -417,34 +397,42 @@ mod remote_map {
let map = map.clone();
async move {
handle.closed().await;
{
let mut inner = map.write().expect("poisoned");
let info = inner.entry(remote_id).or_default();
info.connections.remove(&conn_id);
info.aggregate.last_update = SystemTime::now();
}
let mut inner = map.write().expect("poisoned");
let info = inner.entry(remote_id).or_default();
info.connections.remove(&conn_id);
info.aggregate.last_update = SystemTime::now();
}
.instrument(tracing::Span::current())
});

// Track path changes to update stats aggregate.
if let Some(watcher) = conn.handle.upgrade().map(|c| c.paths()) {
let remote_id = conn.remote_id;
tasks.spawn({
if let Some(mut path_events) = conn.handle.upgrade().map(|conn| conn.path_events()) {
tasks.spawn(
async move {
let mut path_updates = watcher.stream();
while let Some(paths) = path_updates.next().await {
{
let mut inner = map.write().expect("poisoned");
let info = inner.entry(remote_id).or_default();
for path in paths {
info.aggregate.update(&path);
while let Some(event) = path_events.next().await {
let mut inner = map.write().expect("poisoned");
let info = inner.entry(conn.remote_id).or_default();
match event {
PathEvent::Closed {
remote_addr,
last_stats,
..
} => {
info.aggregate.update(remote_addr, *last_stats);
}
_ => {
if let Some(conn) = conn.handle.upgrade() {
for path in conn.paths().iter() {
info.aggregate
.update(path.remote_addr().clone(), path.stats());
}
}
}
}
}
}
.instrument(tracing::Span::current())
});
.instrument(tracing::Span::current()),
);
}
}

Expand All @@ -459,8 +447,10 @@ mod remote_map {
let mut inner = map.write().expect("poisoned");
inner.retain(|_remote, info| {
info.is_active()
|| now.duration_since(info.aggregate().last_update).unwrap()
< retention_time
|| now
.duration_since(info.aggregate().last_update)
.map(|age| age < retention_time)
.unwrap_or(true)
});
}
}
Expand Down
Loading
Loading