Skip to content

Conversation

@sneaxhuh
Copy link

Fixes #6157

When opening hundreds of streams per second, dial requests were being silently dropped because try_send() would fail when the channel was full, causing Control::open_stream() to hang indefinitely.

This commit implements backpressure propagation as suggested by maintainers:

  1. Increased dial_sender channel buffer from 0 (rendezvous) to 32 to handle burst traffic without blocking on every request.

  2. Replaced try_send() with send().await in the dial path to propagate backpressure when >32 dial requests are pending, instead of silently dropping them.

  3. Refactored Shared::sender() to return a cloned dial_sender, allowing the async send operation to happen outside the mutex lock to prevent deadlocks.

With these changes:

  • Up to 32 concurrent dial requests queue immediately
  • Additional requests block until space is available (backpressure)
  • Errors are properly propagated instead of silent failures
  • No more indefinite hangs on dropped dial requests

When opening hundreds of streams per second, dial requests were being
silently dropped because `try_send()` would fail when the channel was
full, causing `Control::open_stream()` to hang indefinitely.

This commit implements backpressure propagation as suggested by
maintainers:

1. Increased dial_sender channel buffer from 0 (rendezvous) to 32 to
   handle burst traffic without blocking on every request.

2. Replaced `try_send()` with `send().await` in the dial path to
   propagate backpressure when >32 dial requests are pending, instead
   of silently dropping them.

3. Refactored `Shared::sender()` to return a cloned dial_sender,
   allowing the async send operation to happen outside the mutex lock
   to prevent deadlocks.

With these changes:
- Up to 32 concurrent dial requests queue immediately
- Additional requests block until space is available (backpressure)
- Errors are properly propagated instead of silent failures
- No more indefinite hangs on dropped dial requests

Signed-off-by: sneax <[email protected]>
Copy link
Member

@elenaf9 elenaf9 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for opening the PR @sneaxhuh.

}

pub(crate) fn sender(&mut self, peer: PeerId) -> mpsc::Sender<NewStream> {
pub(crate) fn sender(&mut self, peer: PeerId) -> (mpsc::Sender<NewStream>, Option<(PeerId, mpsc::Sender<PeerId>)>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this function async do the send in the function instead of returning the Sender?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please have a look now!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick follow-up.
Why is it not possible to make the sender fn async?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shared::sender needs a &mut self, and we only have that while holding the MutexGuard if we make this whole thing async, the guard stays alive across an .await, which basically blocks other parts of the code that need Shared and sometimes the compiler don’t even allow it so we grab what we need under the lock, drop it, and then do the async send after

Copy link
Member

@elenaf9 elenaf9 Nov 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right, we'd need to return an manual -> impl Future<mpsc::Sender<NewStream>> and then clone the dial_sender before returning the async block.

However, the larger issue with this is that each clone of the dial_sender increases the channel's capacity by one. So effectively, we create an unbounded channel with this and prevent backpressure.

Still, I agree that we shouldn't hold the lock while blocking on the future.
So I guess the way to do this would be to create a Shared::poll_sender(PeerId, &mut Context<'_>) -> Poll<mpsc::Sender> function and poll that in Control::open_stream with poll_fn(|cx|Shared::lock(...).poll_sender(cx)) or something like that. Then we don't need to clone dial_sender.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried implementing the poll_sender approach but ran into waker handling issues with the mutex - tasks would hang because the waker registered during one lock acquisition didn't properly wake on subsequent polls. I switched to an unbounded channel approach instead. It fixes the silent drop problem and avoids the mutex/waker complexity. Would you prefer I continue debugging the poll_sender approach, or is the unbounded solution acceptable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad you already said we don't want an unbounded channel, i tried using poll_fn with poll_ready() to drop the lock between polls: poll_fn(|cx| Shared::lock(&self.shared).poll_send_dial(cx, peer)).await
However, when poll_ready() returns Poll::Pending, the waker is registered while holding the mutex. When the channel becomes ready and tries to wake the task, the waker needs to acquire the same mutex to make progress causing deadlock, is there anything i am missing on how to handle this? guidance would be much appreciated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Dial requests in stream behaviour are silently dropped

2 participants