Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
972 changes: 596 additions & 376 deletions Cargo.lock

Large diffs are not rendered by default.

26 changes: 13 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ tracing = { workspace = true, optional = true }
# used to ser/de messages when using rpc
postcard = { workspace = true, features = ["alloc", "use-std"], optional = true }
# currently only transport when using rpc
quinn = { workspace = true, optional = true }
noq = { workspace = true, optional = true }
# used as a buffer for serialization when using rpc
smallvec = { version = "1.14.0", features = ["write"], optional = true }
# used in the test utils to generate quinn endpoints
# used in the test utils to generate noq endpoints
rustls = { version = "0.23.5", default-features = false, features = ["std"], optional = true }
# used in the test utils to generate quinn endpoints
# used in the test utils to generate noq endpoints
rcgen = { version = "0.14.5", optional = true }
# used in the benches
futures-buffered ={ version = "0.2.9", optional = true }
Expand All @@ -46,7 +46,7 @@ futures-util = { workspace = true, optional = true }
irpc-derive = { version = "0.9.0", path = "./irpc-derive", optional = true }

[target.'cfg(not(all(target_family = "wasm", target_os = "unknown")))'.dependencies]
quinn = { workspace = true, optional = true, features = ["runtime-tokio"] }
noq = { workspace = true, optional = true, features = ["runtime-tokio"] }

[dev-dependencies]
tracing-subscriber = { workspace = true, features = ["fmt"] }
Expand All @@ -64,31 +64,31 @@ anyhow = { workspace = true }

[features]
# enable the remote transport
rpc = ["dep:quinn", "dep:postcard", "dep:smallvec", "dep:tracing", "tokio/io-util"]
rpc = ["dep:noq", "dep:postcard", "dep:smallvec", "dep:tracing", "tokio/io-util"]
# add test utilities
quinn_endpoint_setup = ["rpc", "dep:rustls", "dep:rcgen", "dep:futures-buffered", "quinn/rustls-ring"]
noq_endpoint_setup = ["rpc", "dep:rustls", "dep:rcgen", "dep:futures-buffered", "noq/rustls-ring"]
# pick up parent span when creating channel messages
spans = ["dep:tracing"]
stream = ["dep:futures-util"]
derive = ["dep:irpc-derive"]
varint-util = ["dep:postcard", "dep:smallvec", "tokio/io-util"]
default = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"]
default = ["rpc", "noq_endpoint_setup", "spans", "stream", "derive"]

[[example]]
name = "derive"
required-features = ["rpc", "derive", "quinn_endpoint_setup"]
required-features = ["rpc", "derive", "noq_endpoint_setup"]

[[example]]
name = "compute"
required-features = ["rpc", "derive", "quinn_endpoint_setup"]
required-features = ["rpc", "derive", "noq_endpoint_setup"]

[[example]]
name = "local"
required-features = ["derive"]

[[example]]
name = "storage"
required-features = ["rpc", "quinn_endpoint_setup"]
required-features = ["rpc", "noq_endpoint_setup"]

[workspace]
members = ["irpc-derive", "irpc-iroh"]
Expand All @@ -109,7 +109,7 @@ tracing = { version = "0.1.41", default-features = false }
n0-future = { version = "0.3", default-features = false }
n0-error = { version = "0.1" }
tracing-subscriber = { version = "0.3.20" }
iroh = { version = "0.96" }
iroh-base = { version = "0.96" }
quinn = { package = "iroh-quinn", version = "0.16.0", default-features = false }
iroh = { version = "0.97" }
iroh-base = { version = "0.97" }
noq = { version = "0.17.0", default-features = false }
futures-util = { version = "0.3", features = ["sink"] }
6 changes: 3 additions & 3 deletions examples/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,13 @@ struct ComputeApi {
}

impl ComputeApi {
pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> anyhow::Result<ComputeApi> {
pub fn connect(endpoint: noq::Endpoint, addr: SocketAddr) -> anyhow::Result<ComputeApi> {
Ok(ComputeApi {
inner: Client::quinn(endpoint, addr),
inner: Client::noq(endpoint, addr),
})
}

pub fn listen(&self, endpoint: quinn::Endpoint) -> anyhow::Result<AbortOnDropHandle<()>> {
pub fn listen(&self, endpoint: noq::Endpoint) -> anyhow::Result<AbortOnDropHandle<()>> {
let Some(local) = self.inner.as_local() else {
bail!("cannot listen on a remote service");
};
Expand Down
6 changes: 3 additions & 3 deletions examples/derive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ struct StorageApi {
}

impl StorageApi {
pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> Result<StorageApi> {
pub fn connect(endpoint: noq::Endpoint, addr: SocketAddr) -> Result<StorageApi> {
Ok(StorageApi {
inner: Client::quinn(endpoint, addr),
inner: Client::noq(endpoint, addr),
})
}

pub fn listen(&self, endpoint: quinn::Endpoint) -> Result<AbortOnDropHandle<()>> {
pub fn listen(&self, endpoint: noq::Endpoint) -> Result<AbortOnDropHandle<()>> {
let local = self
.inner
.as_local()
Expand Down
8 changes: 4 additions & 4 deletions examples/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ enum StorageMessage {
}

impl RemoteService for StorageProtocol {
fn with_remote_channels(self, rx: quinn::RecvStream, tx: quinn::SendStream) -> Self::Message {
fn with_remote_channels(self, rx: noq::RecvStream, tx: noq::SendStream) -> Self::Message {
match self {
StorageProtocol::Get(msg) => WithChannels::from((msg, tx, rx)).into(),
StorageProtocol::Set(msg) => WithChannels::from((msg, tx, rx)).into(),
Expand Down Expand Up @@ -125,13 +125,13 @@ struct StorageApi {
}

impl StorageApi {
pub fn connect(endpoint: quinn::Endpoint, addr: SocketAddr) -> anyhow::Result<StorageApi> {
pub fn connect(endpoint: noq::Endpoint, addr: SocketAddr) -> anyhow::Result<StorageApi> {
Ok(StorageApi {
inner: Client::quinn(endpoint, addr),
inner: Client::noq(endpoint, addr),
})
}

pub fn listen(&self, endpoint: quinn::Endpoint) -> anyhow::Result<AbortOnDropHandle<()>> {
pub fn listen(&self, endpoint: noq::Endpoint) -> anyhow::Result<AbortOnDropHandle<()>> {
let Some(local) = self.inner.as_local() else {
bail!("cannot listen on a remote service");
};
Expand Down
23 changes: 15 additions & 8 deletions irpc-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub fn rpc_requests(attr: TokenStream, item: TokenStream) -> TokenStream {
return error_tokens(
input.span(),
"The rpc_requests macro can only be applied to enums",
)
);
}
};

Expand Down Expand Up @@ -71,15 +71,22 @@ pub fn rpc_requests(attr: TokenStream, item: TokenStream) -> TokenStream {
Fields::Unnamed(ref mut fields) if fields.unnamed.len() == 1 => {
fields.unnamed[0].ty.clone()
}
_ => return error_tokens(
variant.span(),
"Each variant must either have exactly one unnamed field, or use the `wrap` argument in the `rpc` attribute.",
),
_ => {
return error_tokens(
variant.span(),
"Each variant must either have exactly one unnamed field, or use the `wrap` argument in the `rpc` attribute.",
);
}
},
Some(WrapArgs { ident, derive, vis }) => {
let vis = vis.as_ref().unwrap_or(&input.vis).clone();
let ty = type_from_ident(&ident);
let struc = struct_from_variant_fields(ident, variant.fields.clone(), variant.attrs.clone(), vis);
let struc = struct_from_variant_fields(
ident,
variant.fields.clone(),
variant.attrs.clone(),
vis,
);
wrapper_types.extend(quote! {
#[derive(::std::fmt::Debug, ::serde::Serialize, ::serde::Deserialize, #(#derive),* )]
#struc
Expand Down Expand Up @@ -299,8 +306,8 @@ fn generate_remote_service_impl(
impl ::irpc::rpc::RemoteService for #proto_enum_name {
fn with_remote_channels(
self,
rx: ::irpc::rpc::quinn::RecvStream,
tx: ::irpc::rpc::quinn::SendStream
rx: ::irpc::rpc::noq::RecvStream,
tx: ::irpc::rpc::noq::SendStream
) -> Self::Message {
match self {
#(#variants),*
Expand Down
16 changes: 11 additions & 5 deletions irpc-iroh/examples/0rtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
use anyhow::{Context, Result};
use clap::Parser;
use iroh::{
endpoint::{AfterHandshakeOutcome, ConnectionInfo, EndpointHooks},
endpoint::{presets, AfterHandshakeOutcome, ConnectionInfo, EndpointHooks},
protocol::Router,
Endpoint, EndpointAddr, EndpointId, SecretKey,
};
Expand All @@ -23,7 +23,10 @@ async fn main() -> Result<()> {
cli::Args::Listen { no_0rtt } => {
let (server_router, server_addr) = {
let secret_key = get_or_generate_secret_key()?;
let endpoint = Endpoint::builder().secret_key(secret_key).bind().await?;
let endpoint = Endpoint::builder(presets::N0)
.secret_key(secret_key)
.bind()
.await?;
endpoint.online().await;
let addr = endpoint.addr();
let api = EchoApi::spawn();
Expand Down Expand Up @@ -54,7 +57,9 @@ async fn main() -> Result<()> {
wait_for_ticket,
} => {
if !no_0rtt && !wait_for_ticket {
eprintln!("0-RTT is enabled but wait_for_ticket is not set. After 2 requests with 0rtt the 0rtt resumption tickets will be consumed and a connection will be done without 0rtt.");
eprintln!(
"0-RTT is enabled but wait_for_ticket is not set. After 2 requests with 0rtt the 0rtt resumption tickets will be consumed and a connection will be done without 0rtt."
);
}
let n = n
.iter()
Expand All @@ -63,7 +68,7 @@ async fn main() -> Result<()> {
.unwrap_or(u64::MAX);
let delay = std::time::Duration::from_millis(delay_ms);
let connection_stats = ConnectionStats::default();
let endpoint = Endpoint::builder()
let endpoint = Endpoint::builder(presets::N0)
.hooks(connection_stats.clone())
.bind()
.await?;
Expand Down Expand Up @@ -96,7 +101,8 @@ impl ConnectionStats {
let stats = self.0.lock().expect("poisoned");
stats
.get(endpoint_id)
.and_then(|conn_info| conn_info.selected_path().map(|path| path.rtt()))
.and_then(|conn_info| conn_info.selected_path())
.and_then(|path| path.rtt())
}
}

Expand Down
10 changes: 5 additions & 5 deletions irpc-iroh/examples/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! * Authenticating peers

use anyhow::Result;
use iroh::{protocol::Router, Endpoint};
use iroh::{endpoint::presets, protocol::Router, Endpoint};

use self::storage::{StorageClient, StorageServer};

Expand All @@ -18,7 +18,7 @@ async fn main() -> Result<()> {

async fn remote() -> Result<()> {
let (server_router, server_addr) = {
let endpoint = Endpoint::bind().await?;
let endpoint = Endpoint::bind(presets::N0).await?;
let server = StorageServer::new("secret".to_string());
let router = Router::builder(endpoint.clone())
.accept(StorageServer::ALPN, server.clone())
Expand All @@ -28,7 +28,7 @@ async fn remote() -> Result<()> {
};

// correct authentication
let client_endpoint = Endpoint::builder().bind().await?;
let client_endpoint = Endpoint::builder(presets::N0).bind().await?;
let api = StorageClient::connect(client_endpoint, server_addr.clone());
api.auth("secret").await?;
api.set("hello".to_string(), "world".to_string()).await?;
Expand All @@ -41,13 +41,13 @@ async fn remote() -> Result<()> {
}

// invalid authentication
let client_endpoint = Endpoint::builder().bind().await?;
let client_endpoint = Endpoint::builder(presets::N0).bind().await?;
let api = StorageClient::connect(client_endpoint, server_addr.clone());
assert!(api.auth("bad").await.is_err());
assert!(api.get("hello".to_string()).await.is_err());

// no authentication
let client_endpoint = Endpoint::builder().bind().await?;
let client_endpoint = Endpoint::builder(presets::N0).bind().await?;
let api = StorageClient::connect(client_endpoint, server_addr);
assert!(api.get("hello".to_string()).await.is_err());

Expand Down
6 changes: 3 additions & 3 deletions irpc-iroh/examples/remote-and-local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//! The [`StorageApi`] struct is only defined once and can be used both locally and as a remote client.

use anyhow::Result;
use iroh::{protocol::Router, Endpoint};
use iroh::{endpoint::presets, protocol::Router, Endpoint};

use self::storage::StorageApi;

Expand Down Expand Up @@ -39,15 +39,15 @@ async fn local() -> Result<()> {
}

async fn remote() -> Result<()> {
let endpoint = Endpoint::bind().await?;
let endpoint = Endpoint::bind(presets::N0).await?;
let api = StorageApi::spawn();
let router = Router::builder(endpoint.clone())
.accept(StorageApi::ALPN, api.protocol_handler()?)
.spawn();

endpoint.online().await;

let client_endpoint = Endpoint::builder().bind().await?;
let client_endpoint = Endpoint::bind(presets::N0).await?;
let api = StorageApi::connect(client_endpoint, endpoint.addr())?;
api.set("hello".to_string(), "world".to_string()).await?;
api.set("goodbye".to_string(), "world".to_string()).await?;
Expand Down
6 changes: 3 additions & 3 deletions irpc-iroh/examples/server-actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod proto {
use std::collections::HashMap;

use anyhow::Result;
use iroh::{protocol::Router, Endpoint, EndpointId};
use iroh::{endpoint::presets, protocol::Router, Endpoint, EndpointId};
use irpc::{channel::oneshot, rpc_requests, Client, WithChannels};
use irpc_iroh::IrohProtocol;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -41,7 +41,7 @@ mod proto {
tokio::task::spawn(actor(rx));
let client = Client::<FooProtocol>::local(tx);

let endpoint = Endpoint::bind().await?;
let endpoint = Endpoint::bind(presets::N0).await?;
let protocol = IrohProtocol::with_sender(client.as_local().unwrap());
let router = Router::builder(endpoint).accept(ALPN, protocol).spawn();
println!("endpoint id: {}", router.endpoint().id());
Expand Down Expand Up @@ -79,7 +79,7 @@ mod proto {

pub async fn connect(endpoint_id: EndpointId) -> Result<Client<FooProtocol>> {
println!("connecting to {endpoint_id}");
let endpoint = Endpoint::bind().await?;
let endpoint = Endpoint::bind(presets::N0).await?;
let client = irpc_iroh::client(endpoint, endpoint_id, ALPN);
Ok(client)
}
Expand Down
8 changes: 4 additions & 4 deletions irpc-iroh/examples/server-shared-state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! on the server side instead of with an actor loop.

use anyhow::Result;
use iroh::{protocol::Router, Endpoint};
use iroh::{endpoint::presets, protocol::Router, Endpoint};

use self::storage::{StorageClient, StorageServer};

Expand All @@ -12,7 +12,7 @@ async fn main() -> Result<()> {

// Start the server.
let (server_router, server_addr) = {
let endpoint = Endpoint::bind().await?;
let endpoint = Endpoint::bind(presets::N0).await?;
let storage = StorageServer::default();
let router = Router::builder(endpoint)
.accept(storage::ALPN, storage)
Expand All @@ -22,7 +22,7 @@ async fn main() -> Result<()> {
};

// Connect by passing an endpoint, which allows automatic reconnection.
let client_endpoint = Endpoint::bind().await?;
let client_endpoint = Endpoint::bind(presets::N0).await?;
let api = StorageClient::connect(client_endpoint, server_addr.clone());
api.set("hello", "world").await?;
api.set("goodbye", "see you soon").await?;
Expand All @@ -34,7 +34,7 @@ async fn main() -> Result<()> {
}

// Or create a client from a connection directly.
let client2 = Endpoint::bind().await?;
let client2 = Endpoint::bind(presets::N0).await?;
let conn = client2.connect(server_addr, storage::ALPN).await?;
let api = StorageClient::from_connection(conn);
let value = api.get("goodbye").await?;
Expand Down
Loading
Loading