diff --git a/Cargo.lock b/Cargo.lock index b9eb9ad..efef144 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,7 +1197,7 @@ dependencies = [ [[package]] name = "spawned-concurrency" -version = "0.4.1" +version = "0.4.2" dependencies = [ "futures", "pin-project-lite", @@ -1210,7 +1210,7 @@ dependencies = [ [[package]] name = "spawned-rt" -version = "0.4.1" +version = "0.4.2" dependencies = [ "crossbeam", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 2b55e5e..667b687 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,12 +16,12 @@ members = [ ] [workspace.dependencies] -spawned-rt = { path = "rt", version = "0.4.1" } -spawned-concurrency = { path = "concurrency", version = "0.4.1" } +spawned-rt = { path = "rt", version = "0.4.2" } +spawned-concurrency = { path = "concurrency", version = "0.4.2" } tracing = { version = "0.1.41", features = ["log"] } tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } [workspace.package] -version = "0.4.1" +version = "0.4.2" license = "MIT" edition = "2021" diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index d516b1d..15108a1 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -4,8 +4,12 @@ use crate::{ error::GenServerError, tasks::InitResult::{NoSuccess, Success}, }; -use futures::future::FutureExt as _; -use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken}; +use core::pin::pin; +use futures::future::{self, FutureExt as _}; +use spawned_rt::{ + tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken, JoinHandle}, + threads, +}; use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration}; const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5); @@ -27,7 +31,7 @@ impl Clone for GenServerHandle { } impl GenServerHandle { - pub(crate) fn new(gen_server: G) -> Self { + fn new(gen_server: G) -> Self { let (tx, mut rx) = mpsc::channel::>(); let cancellation_token = CancellationToken::new(); let handle = GenServerHandle { @@ -51,7 +55,7 @@ impl GenServerHandle { handle_clone } - pub(crate) fn new_blocking(gen_server: G) -> Self { + fn new_blocking(gen_server: G) -> Self { let (tx, mut rx) = mpsc::channel::>(); let cancellation_token = CancellationToken::new(); let handle = GenServerHandle { @@ -70,6 +74,25 @@ impl GenServerHandle { handle_clone } + fn new_on_thread(gen_server: G) -> Self { + let (tx, mut rx) = mpsc::channel::>(); + let cancellation_token = CancellationToken::new(); + let handle = GenServerHandle { + tx, + cancellation_token, + }; + let handle_clone = handle.clone(); + // Ignore the JoinHandle for now. Maybe we'll use it in the future + let _join_handle = threads::spawn(|| { + threads::block_on(async move { + if let Err(error) = gen_server.run(&handle, &mut rx).await { + tracing::trace!(%error, "GenServer crashed") + }; + }) + }); + handle_clone + } + pub fn sender(&self) -> mpsc::Sender> { self.tx.clone() } @@ -153,6 +176,15 @@ pub trait GenServer: Send + Sized { GenServerHandle::new_blocking(self) } + /// For some "singleton" GenServers that run througout the whole execution of the + /// program, it makes sense to run in their own dedicated thread to avoid interference + /// with the rest of the tasks' runtime. + /// The use of tokio::task::spawm_blocking is not recommended for these scenarios + /// as it is a limited thread pool better suited for blocking IO tasks that eventually end + fn start_on_thread(self) -> GenServerHandle { + GenServerHandle::new_on_thread(self) + } + fn run( self, handle: &GenServerHandle, @@ -300,6 +332,36 @@ pub trait GenServer: Send + Sized { } } +/// Spawns a task that awaits on a future and sends a message to a GenServer +/// on completion. +/// This function returns a handle to the spawned task. +pub fn send_message_on( + handle: GenServerHandle, + future: U, + message: T::CastMsg, +) -> JoinHandle<()> +where + T: GenServer, + U: Future + Send + 'static, + ::Output: Send, +{ + let cancelation_token = handle.cancellation_token(); + let mut handle_clone = handle.clone(); + let join_handle = rt::spawn(async move { + let is_cancelled = pin!(cancelation_token.cancelled()); + let signal = pin!(future); + match future::select(is_cancelled, signal).await { + future::Either::Left(_) => tracing::debug!("GenServer stopped"), + future::Either::Right(_) => { + if let Err(e) = handle_clone.cast(message).await { + tracing::error!("Failed to send message: {e:?}") + } + } + } + }); + join_handle +} + #[cfg(debug_assertions)] mod warn_on_block { use super::*; diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index 65ce84b..6936162 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -12,8 +12,8 @@ mod stream_tests; mod timer_tests; pub use gen_server::{ - CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, InitResult, - InitResult::NoSuccess, InitResult::Success, + send_message_on, CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, + InitResult, InitResult::NoSuccess, InitResult::Success, }; pub use process::{send, Process, ProcessInfo}; pub use stream::spawn_listener; diff --git a/examples/blocking_genserver/main.rs b/examples/blocking_genserver/main.rs index ca954a7..981f5ab 100644 --- a/examples/blocking_genserver/main.rs +++ b/examples/blocking_genserver/main.rs @@ -99,7 +99,7 @@ impl GenServer for WellBehavedTask { pub fn main() { rt::run(async move { // If we change BadlyBehavedTask to start instead, it can stop the entire program - let mut badboy = BadlyBehavedTask::new().start_blocking(); + let mut badboy = BadlyBehavedTask::new().start_on_thread(); let _ = badboy.cast(()).await; let mut goodboy = WellBehavedTask::new(0).start(); let _ = goodboy.cast(()).await;