Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ members = [
]

[workspace.dependencies]
spawned-rt = { path = "rt", version = "0.4.1" }
spawned-concurrency = { path = "concurrency", version = "0.4.1" }
spawned-rt = { path = "rt", version = "0.4.2" }
spawned-concurrency = { path = "concurrency", version = "0.4.2" }
tracing = { version = "0.1.41", features = ["log"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }

[workspace.package]
version = "0.4.1"
version = "0.4.2"
license = "MIT"
edition = "2021"
70 changes: 66 additions & 4 deletions concurrency/src/tasks/gen_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ use crate::{
error::GenServerError,
tasks::InitResult::{NoSuccess, Success},
};
use futures::future::FutureExt as _;
use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken};
use core::pin::pin;
use futures::future::{self, FutureExt as _};
use spawned_rt::{
tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken, JoinHandle},
threads,
};
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration};

const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5);
Expand All @@ -27,7 +31,7 @@ impl<G: GenServer> Clone for GenServerHandle<G> {
}

impl<G: GenServer> GenServerHandle<G> {
pub(crate) fn new(gen_server: G) -> Self {
fn new(gen_server: G) -> Self {
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
let cancellation_token = CancellationToken::new();
let handle = GenServerHandle {
Expand All @@ -51,7 +55,7 @@ impl<G: GenServer> GenServerHandle<G> {
handle_clone
}

pub(crate) fn new_blocking(gen_server: G) -> Self {
fn new_blocking(gen_server: G) -> Self {
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
let cancellation_token = CancellationToken::new();
let handle = GenServerHandle {
Expand All @@ -70,6 +74,25 @@ impl<G: GenServer> GenServerHandle<G> {
handle_clone
}

fn new_on_thread(gen_server: G) -> Self {
let (tx, mut rx) = mpsc::channel::<GenServerInMsg<G>>();
let cancellation_token = CancellationToken::new();
let handle = GenServerHandle {
tx,
cancellation_token,
};
let handle_clone = handle.clone();
// Ignore the JoinHandle for now. Maybe we'll use it in the future
let _join_handle = threads::spawn(|| {
threads::block_on(async move {
if let Err(error) = gen_server.run(&handle, &mut rx).await {
tracing::trace!(%error, "GenServer crashed")
};
})
});
handle_clone
}

pub fn sender(&self) -> mpsc::Sender<GenServerInMsg<G>> {
self.tx.clone()
}
Expand Down Expand Up @@ -153,6 +176,15 @@ pub trait GenServer: Send + Sized {
GenServerHandle::new_blocking(self)
}

/// For some "singleton" GenServers that run througout the whole execution of the
/// program, it makes sense to run in their own dedicated thread to avoid interference
/// with the rest of the tasks' runtime.
/// The use of tokio::task::spawm_blocking is not recommended for these scenarios
/// as it is a limited thread pool better suited for blocking IO tasks that eventually end
fn start_on_thread(self) -> GenServerHandle<Self> {
GenServerHandle::new_on_thread(self)
}

fn run(
self,
handle: &GenServerHandle<Self>,
Expand Down Expand Up @@ -300,6 +332,36 @@ pub trait GenServer: Send + Sized {
}
}

/// Spawns a task that awaits on a future and sends a message to a GenServer
/// on completion.
/// This function returns a handle to the spawned task.
pub fn send_message_on<T, U>(
handle: GenServerHandle<T>,
future: U,
message: T::CastMsg,
) -> JoinHandle<()>
where
T: GenServer,
U: Future + Send + 'static,
<U as Future>::Output: Send,
{
let cancelation_token = handle.cancellation_token();
let mut handle_clone = handle.clone();
let join_handle = rt::spawn(async move {
let is_cancelled = pin!(cancelation_token.cancelled());
let signal = pin!(future);
match future::select(is_cancelled, signal).await {
future::Either::Left(_) => tracing::debug!("GenServer stopped"),
future::Either::Right(_) => {
if let Err(e) = handle_clone.cast(message).await {
tracing::error!("Failed to send message: {e:?}")
}
}
}
});
join_handle
}

#[cfg(debug_assertions)]
mod warn_on_block {
use super::*;
Expand Down
4 changes: 2 additions & 2 deletions concurrency/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ mod stream_tests;
mod timer_tests;

pub use gen_server::{
CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, InitResult,
InitResult::NoSuccess, InitResult::Success,
send_message_on, CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg,
InitResult, InitResult::NoSuccess, InitResult::Success,
};
pub use process::{send, Process, ProcessInfo};
pub use stream::spawn_listener;
Expand Down
2 changes: 1 addition & 1 deletion examples/blocking_genserver/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl GenServer for WellBehavedTask {
pub fn main() {
rt::run(async move {
// If we change BadlyBehavedTask to start instead, it can stop the entire program
let mut badboy = BadlyBehavedTask::new().start_blocking();
let mut badboy = BadlyBehavedTask::new().start_on_thread();
let _ = badboy.cast(()).await;
let mut goodboy = WellBehavedTask::new(0).start();
let _ = goodboy.cast(()).await;
Expand Down