Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 41 additions & 4 deletions iroh/examples/0rtt.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use std::{env, future::Future, str::FromStr, time::Instant};
use std::{
env,
future::Future,
str::FromStr,
time::{Duration, Instant},
};

use clap::Parser;
use data_encoding::HEXLOWER;
use iroh::{
SecretKey,
endpoint::{Connecting, Connection},
endpoint::{Connecting, Connection, ZRTTConnection},
};
use iroh_base::ticket::EndpointTicket;
use n0_future::{StreamExt, future};
use n0_snafu::ResultExt;
use n0_watcher::Watcher;
use quinn::VarInt;
use tracing::{info, trace};

const PINGPONG_ALPN: &[u8] = b"0rtt-pingpong";
Expand Down Expand Up @@ -50,7 +56,7 @@ pub fn get_or_generate_secret_key() -> n0_snafu::Result<SecretKey> {
/// read the response immediately. Otherwise, the stream pair is bad and we need
/// to open a new stream pair.
async fn pingpong(
connection: &Connection,
connection: &Conn,
proceed: impl Future<Output = bool>,
x: u64,
) -> n0_snafu::Result<()> {
Expand All @@ -74,16 +80,46 @@ async fn pingpong(
Ok(())
}

async fn pingpong_0rtt(connecting: Connecting, i: u64) -> n0_snafu::Result<Connection> {
enum Conn {
ZRTT(ZRTTConnection),
Full(Connection),
}
Comment on lines +83 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if you took an entirely different approach. This made sketched out with my proposed ZrttConnection::into_connection:

async fn pingpong(mut send: SendStream, mut recv: RecvStream, x: u64) -> n0_snafu::Result<()> {
    let data = x.to_be_bytes();
    send.write_all(&data).await.e()?;
    send.finish().e()?;
    let echo = recv.read_to_end(8).await.e()?;
    assert_eq!(echo, data);
    Ok(())
}

async fn connect(args: Args) -> n0_snafu::Result<()> {
    let endpoint_addr = args.endpoint.unwrap().endpoint_addr().clone();
    let endpoint = iroh::Endpoint::builder()
        .relay_mode(iroh::RelayMode::Disabled)
        .keylog(true)
        .bind()
        .await?;
    let t0 = Instant::now();
    for i in 0..args.rounds {
        let t0 = Instant::now();
        let connecting = endpoint
            .connect_with_opts(endpoint_addr.clone(), PINGPONG_ALPN, Default::default())
            .await?;
        let connection = if args.disable_0rtt {
            let connection = connecting.await.e()?;
            trace!("connecting without 0-RTT");
            let (send, recv) = connection.open_bi().await.e()?;
            pingpong(send, recv, i).await?;
            connection
        } else {
            match connecting.into_0rtt() {
                Ok((zrtt_connection, zrtt_accepted_fut)) => {
                    trace!("0-RTT possible from our side");
                    let (send, recv) = zrtt_connection.open_bi().await.e()?;
                    let zrtt_task = tokio::spawn(pingpong(send, recv, i));
                    match zrtt_accepted_fut.await {
                        true => {
                            zrtt_task.await;
                            zrtt_connection.into_connection().await
                        }
                        false => {
                            zrtt_task.abort();
                            let conn = zrtt_connection.into_connection.await;
                            let (send, recv) = conn.open_bi().await.e()?;
                            pingpong(send, recv, i).await?;
                            conn
                        }
                    }
                }
                Err(connecting) => {
                    trace!("0-RTT not possible from our side");
                    let conn = connecting.await.e()?;
                    let (send, recv) = conn.open_bi().await.e()?;
                    pingpong(send, recv, i).await?;
                    conn
                }
            }
        };
        tokio::spawn(async move {
            // wait for some time for the handshake to complete and the server
            // to send a NewSessionTicket. This is less than ideal, but we
            // don't have a better way to wait for the handshake to complete.
            tokio::time::sleep(connection.rtt() * 2).await;
            connection.close(0u8.into(), b"");
        });
        let elapsed = t0.elapsed();
        println!("round {i}: {} us", elapsed.as_micros());
    }
    let elapsed = t0.elapsed();
    println!("total time: {} us", elapsed.as_micros());
    println!(
        "time per round: {} us",
        elapsed.as_micros() / (args.rounds as u128)
    );
    Ok(())
}

It's kind of sad to use a task for this, but it was so much easier to express this way. Maybe there's a nice readable way to express this without a task that I didn't think off.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To explain what I'm trying here: remove the need for the Conn enum by passing streams directly to the 0-rtt using code-path.


impl Conn {
fn open_bi(&self) -> quinn::OpenBi<'_> {
match self {
Conn::ZRTT(conn) => conn.open_bi(),
Conn::Full(conn) => conn.open_bi(),
}
}

fn close(&self, error_code: VarInt, reason: &[u8]) {
match self {
Conn::ZRTT(conn) => conn.close(error_code, reason),
Conn::Full(conn) => conn.close(error_code, reason),
}
}

fn rtt(&self) -> Duration {
match self {
Conn::ZRTT(conn) => conn.rtt(),
Conn::Full(conn) => conn.rtt(),
}
}
}

async fn pingpong_0rtt(connecting: Connecting, i: u64) -> n0_snafu::Result<Conn> {
let connection = match connecting.into_0rtt() {
Ok((connection, accepted)) => {
trace!("0-RTT possible from our side");
let connection = Conn::ZRTT(connection);
pingpong(&connection, accepted, i).await?;
connection
}
Err(connecting) => {
trace!("0-RTT not possible from our side");
let connection = connecting.await.e()?;
let connection = Conn::Full(connection);
pingpong(&connection, future::ready(true), i).await?;
connection
}
Expand All @@ -106,6 +142,7 @@ async fn connect(args: Args) -> n0_snafu::Result<()> {
.await?;
let connection = if args.disable_0rtt {
let connection = connecting.await.e()?;
let connection = Conn::Full(connection);
trace!("connecting without 0-RTT");
pingpong(&connection, future::ready(true), i).await?;
connection
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/dht_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ async fn chat_server(args: Args) -> n0_snafu::Result<()> {
};
tokio::spawn(async move {
let connection = connecting.await.e()?;
let remote_endpoint_id = connection.remote_id()?;
let remote_endpoint_id = connection.remote_id();
println!("got connection from {remote_endpoint_id}");
// just leave the tasks hanging. this is just an example.
let (mut writer, mut reader) = connection.accept_bi().await.e()?;
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/echo-no-router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ async fn start_accept_side() -> Result<Endpoint> {
let connection = incoming.await.e()?;

// We can get the remote's endpoint id from the connection.
let endpoint_id = connection.remote_id()?;
let endpoint_id = connection.remote_id();
println!("accepted connection from {endpoint_id}");

// Our protocol is a simple request-response protocol, so we expect the
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/echo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl ProtocolHandler for Echo {
/// the connection lasts.
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
// We can get the remote's endpoint id from the connection.
let endpoint_id = connection.remote_id()?;
let endpoint_id = connection.remote_id();
println!("accepted connection from {endpoint_id}");

// Our protocol is a simple request-response protocol, so we expect the
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/listen-unreliable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn main() -> Result<()> {
};
let alpn = connecting.alpn().await?;
let conn = connecting.await.e()?;
let endpoint_id = conn.remote_id()?;
let endpoint_id = conn.remote_id();
info!(
"new (unreliable) connection from {endpoint_id} with ALPN {}",
String::from_utf8_lossy(&alpn),
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/listen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ async fn main() -> n0_snafu::Result<()> {
};
let alpn = connecting.alpn().await?;
let conn = connecting.await.e()?;
let endpoint_id = conn.remote_id()?;
let endpoint_id = conn.remote_id();
info!(
"new connection from {endpoint_id} with ALPN {}",
String::from_utf8_lossy(&alpn),
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/screening-connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl ProtocolHandler for ScreenedEcho {
/// the connection lasts.
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
// We can get the remote's endpoint id from the connection.
let endpoint_id = connection.remote_id()?;
let endpoint_id = connection.remote_id();
println!("accepted connection from {endpoint_id}");

// Our protocol is a simple request-response protocol, so we expect the
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl ProtocolHandler for BlobSearch {
/// the connection lasts.
async fn accept(&self, connection: Connection) -> Result<(), AcceptError> {
// We can get the remote's endpoint id from the connection.
let endpoint_id = connection.remote_id()?;
let endpoint_id = connection.remote_id();
println!("accepted connection from {endpoint_id}");

// Our protocol is a simple request-response protocol, so we expect the
Expand Down
2 changes: 1 addition & 1 deletion iroh/examples/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ async fn provide(endpoint: Endpoint, size: u64) -> Result<()> {
let endpoint_clone = endpoint.clone();
tokio::spawn(async move {
let conn = connecting.await.e()?;
let endpoint_id = conn.remote_id()?;
let endpoint_id = conn.remote_id();
info!(
"new connection from {endpoint_id} with ALPN {}",
String::from_utf8_lossy(TRANSFER_ALPN),
Expand Down
Loading
Loading