diff --git a/src/conn/pool/futures/get_conn.rs b/src/conn/pool/futures/get_conn.rs index b89f9bc6..ba27f152 100644 --- a/src/conn/pool/futures/get_conn.rs +++ b/src/conn/pool/futures/get_conn.rs @@ -6,19 +6,7 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. -use std::{ - fmt, - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -use futures_core::ready; -#[cfg(feature = "tracing")] -use { - std::sync::Arc, - tracing::{debug_span, Span}, -}; +use std::fmt; use crate::{ conn::{ @@ -55,30 +43,14 @@ impl fmt::Debug for GetConnInner { } } -/// This future will take connection from a pool and resolve to [`Conn`]. #[derive(Debug)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct GetConn { - pub(crate) queue_id: QueueId, - pub(crate) pool: Option, - pub(crate) inner: GetConnInner, - reset_upon_returning_to_a_pool: bool, - #[cfg(feature = "tracing")] - span: Arc, +struct GetConnState { + queue_id: QueueId, + pool: Option, + inner: GetConnInner, } -impl GetConn { - pub(crate) fn new(pool: &Pool, reset_upon_returning_to_a_pool: bool) -> GetConn { - GetConn { - queue_id: QueueId::next(), - pool: Some(pool.clone()), - inner: GetConnInner::New, - reset_upon_returning_to_a_pool, - #[cfg(feature = "tracing")] - span: Arc::new(debug_span!("mysql_async::get_conn")), - } - } - +impl GetConnState { fn pool_mut(&mut self) -> &mut Pool { self.pool .as_mut() @@ -92,78 +64,75 @@ impl GetConn { } } -// this manual implementation of Future may seem stupid, but we sort -// of need it to get the dropping behavior we want. -impl Future for GetConn { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - #[cfg(feature = "tracing")] - let span = self.span.clone(); - #[cfg(feature = "tracing")] - let _span_guard = span.enter(); - loop { - match self.inner { - GetConnInner::New => { - let queue_id = self.queue_id; - let next = ready!(self.pool_mut().poll_new_conn(cx, queue_id))?; - match next { - GetConnInner::Connecting(conn_fut) => { - self.inner = GetConnInner::Connecting(conn_fut); - } - GetConnInner::Checking(conn_fut) => { - self.inner = GetConnInner::Checking(conn_fut); - } - GetConnInner::Done => unreachable!( - "Pool::poll_new_conn never gives out already-consumed GetConns" - ), - GetConnInner::New => { - unreachable!("Pool::poll_new_conn never gives out GetConnInner::New") - } +/// This future will take connection from a pool and resolve to [`Conn`]. +#[cfg_attr(feature = "tracing", tracing::instrument(level = "debug", skip_all))] +pub(crate) async fn get_conn(pool: Pool) -> Result { + let reset_upon_returning_to_a_pool = pool.opts.pool_opts().reset_connection(); + let queue_id = QueueId::next(); + let mut state = GetConnState { + queue_id, + pool: Some(pool), + inner: GetConnInner::New, + }; + + loop { + match state.inner { + GetConnInner::New => { + let pool = state.pool_mut(); + let next = pool.new_conn(queue_id).await?; + match next { + GetConnInner::Connecting(conn_fut) => { + state.inner = GetConnInner::Connecting(conn_fut); + } + GetConnInner::Checking(conn_fut) => { + state.inner = GetConnInner::Checking(conn_fut); + } + GetConnInner::Done => unreachable!( + "Pool::poll_new_conn never gives out already-consumed GetConns" + ), + GetConnInner::New => { + unreachable!("Pool::poll_new_conn never gives out GetConnInner::New") } } - GetConnInner::Done => { - unreachable!("GetConn::poll polled after returning Async::Ready"); - } - GetConnInner::Connecting(ref mut f) => { - let result = ready!(Pin::new(f).poll(cx)); - let pool = self.pool_take(); - - self.inner = GetConnInner::Done; - - return match result { - Ok(mut c) => { - c.inner.pool = Some(pool); - c.inner.reset_upon_returning_to_a_pool = - self.reset_upon_returning_to_a_pool; - Poll::Ready(Ok(c)) - } - Err(e) => { - pool.cancel_connection(); - Poll::Ready(Err(e)) - } - }; - } - GetConnInner::Checking(ref mut f) => { - let result = ready!(Pin::new(f).poll(cx)); - match result { - Ok(mut c) => { - self.inner = GetConnInner::Done; - - let pool = self.pool_take(); - c.inner.pool = Some(pool); - c.inner.reset_upon_returning_to_a_pool = - self.reset_upon_returning_to_a_pool; - return Poll::Ready(Ok(c)); - } - Err(_) => { - // Idling connection is broken. We'll drop it and try again. - self.inner = GetConnInner::New; + } + GetConnInner::Done => { + unreachable!("GetConn::poll polled after returning Async::Ready"); + } + GetConnInner::Connecting(ref mut f) => { + let result = f.await; + let pool = state.pool_take(); + state.inner = GetConnInner::Done; + + return match result { + Ok(mut c) => { + c.inner.pool = Some(pool); + c.inner.reset_upon_returning_to_a_pool = reset_upon_returning_to_a_pool; + Ok(c) + } + Err(e) => { + pool.cancel_connection(); + Err(e) + } + }; + } + GetConnInner::Checking(ref mut f) => { + let result = f.await; + match result { + Ok(mut c) => { + state.inner = GetConnInner::Done; + + let pool = state.pool_take(); + c.inner.pool = Some(pool); + c.inner.reset_upon_returning_to_a_pool = reset_upon_returning_to_a_pool; + return Ok(c); + } + Err(_) => { + // Idling connection is broken. We'll drop it and try again. + state.inner = GetConnInner::New; - let pool = self.pool_mut(); - pool.cancel_connection(); - continue; - } + let pool = state.pool_mut(); + pool.cancel_connection(); + continue; } } } @@ -171,7 +140,7 @@ impl Future for GetConn { } } -impl Drop for GetConn { +impl Drop for GetConnState { fn drop(&mut self) { // We drop a connection before it can be resolved, a.k.a. cancelling it. // Make sure we maintain the necessary invariants towards the pool. diff --git a/src/conn/pool/futures/mod.rs b/src/conn/pool/futures/mod.rs index 00842994..6cf18080 100644 --- a/src/conn/pool/futures/mod.rs +++ b/src/conn/pool/futures/mod.rs @@ -6,8 +6,8 @@ // option. All files in the project carrying such notice may not be copied, // modified, or distributed except according to those terms. +pub use self::disconnect_pool::DisconnectPool; +pub(crate) use self::get_conn::get_conn; pub(super) use self::get_conn::GetConnInner; -pub use self::{disconnect_pool::DisconnectPool, get_conn::GetConn}; - mod disconnect_pool; mod get_conn; diff --git a/src/conn/pool/mod.rs b/src/conn/pool/mod.rs index de685441..772480c1 100644 --- a/src/conn/pool/mod.rs +++ b/src/conn/pool/mod.rs @@ -14,6 +14,8 @@ use std::{ borrow::Borrow, cmp::Reverse, collections::VecDeque, + future::poll_fn, + future::Future, hash::{Hash, Hasher}, str::FromStr, sync::{atomic, Arc, Mutex}, @@ -272,9 +274,8 @@ impl Pool { } /// Async function that resolves to `Conn`. - pub fn get_conn(&self) -> GetConn { - let reset_connection = self.opts.pool_opts().reset_connection(); - GetConn::new(self, reset_connection) + pub fn get_conn(&self) -> impl Future> { + get_conn(self.clone()) } /// Starts a new transaction. @@ -286,7 +287,7 @@ impl Pool { /// Async function that disconnects this pool from the server and resolves to `()`. /// /// **Note:** This Future won't resolve until all active connections, taken from it, - /// are dropped or disonnected. Also all pending and new `GetConn`'s will resolve to error. + /// are dropped or disonnected. Also all pending and new `get_conn()`'s will resolve to error. pub fn disconnect(self) -> DisconnectPool { DisconnectPool::new(self) } @@ -336,19 +337,41 @@ impl Pool { } } - /// Poll the pool for an available connection. - fn poll_new_conn( - &mut self, - cx: &mut Context<'_>, - queue_id: QueueId, - ) -> Poll> { + fn queue(&mut self, cx: &mut Context<'_>, queue_id: QueueId) -> Poll<()> { + let mut exchange = self.inner.exchange.lock().unwrap(); + exchange.waiting.push(cx.waker().clone(), queue_id); + Poll::Ready(()) + } + + fn poll_higher_priority(&mut self, cx: &mut Context<'_>, queue_id: QueueId) -> Poll<()> { + let mut exchange = self.inner.exchange.lock().unwrap(); + let highest = if let Some(cur) = exchange.waiting.peek_id() { + queue_id > cur + } else { + true + }; + if highest { + Poll::Ready(()) + } else { + // to make sure the waker is updated + exchange.waiting.push(cx.waker().clone(), queue_id); + Poll::Pending + } + } + + async fn queue_and_wait(&mut self, queue_id: QueueId) { + poll_fn(|cx| self.queue(cx, queue_id)).await; + poll_fn(|cx| self.poll_higher_priority(cx, queue_id)).await; + } + + fn try_new_conn(&mut self, queue_id: QueueId) -> Result> { let mut exchange = self.inner.exchange.lock().unwrap(); // NOTE: this load must happen while we hold the lock, // otherwise the recycler may choose to exit, see that .exist == 0, and then exit, // and then we decide to create a new connection, which would then never be torn down. if self.inner.close.load(atomic::Ordering::Acquire) { - return Err(Error::Driver(DriverError::PoolDisconnected)).into(); + return Err(Error::Driver(DriverError::PoolDisconnected)); } exchange.spawn_futures_if_needed(&self.inner); @@ -362,8 +385,7 @@ impl Pool { // If we are not, just queue if !highest { - exchange.waiting.push(cx.waker().clone(), queue_id); - return Poll::Pending; + return Ok(None); } #[allow(unused_variables)] // `since` is only used when `hdrhistogram` is enabled @@ -379,7 +401,7 @@ impl Pool { #[cfg(feature = "hdrhistogram")] let metrics = self.metrics(); conn.inner.active_since = Instant::now(); - return Poll::Ready(Ok(GetConnInner::Checking( + return Ok(Some(GetConnInner::Checking( async move { conn.stream_mut()?.check().await?; #[cfg(feature = "hdrhistogram")] @@ -419,7 +441,7 @@ impl Pool { #[cfg(feature = "hdrhistogram")] let metrics = self.metrics(); - return Poll::Ready(Ok(GetConnInner::Connecting( + return Ok(Some(GetConnInner::Connecting( async move { let conn = Conn::new(opts).await; #[cfg(feature = "hdrhistogram")] @@ -437,10 +459,17 @@ impl Pool { .boxed(), ))); } + Ok(None) + } - // Polled, but no conn available? Back into the queue. - exchange.waiting.push(cx.waker().clone(), queue_id); - Poll::Pending + /// Get a new connection from the pool. + async fn new_conn(&mut self, queue_id: QueueId) -> Result { + loop { + if let Some(conn) = self.try_new_conn(queue_id)? { + return Ok(conn); + } + self.queue_and_wait(queue_id).await; + } } fn unqueue(&self, queue_id: QueueId) { diff --git a/src/lib.rs b/src/lib.rs index 4ad9d735..50dac1d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -558,7 +558,7 @@ pub use crate::connection_like::{Connection, ToConnectionResult}; /// Futures used in this crate pub mod futures { - pub use crate::conn::pool::futures::{DisconnectPool, GetConn}; + pub use crate::conn::pool::futures::DisconnectPool; } /// Traits used in this crate diff --git a/tests/exports.rs b/tests/exports.rs index 6f9feef8..fb1fcd32 100644 --- a/tests/exports.rs +++ b/tests/exports.rs @@ -1,7 +1,7 @@ #[allow(unused_imports)] use mysql_async::{ consts, from_row, from_row_opt, from_value, from_value_opt, - futures::{DisconnectPool, GetConn}, + futures::DisconnectPool, params, prelude::{ BatchQuery, FromRow, FromValue, GlobalHandler, Protocol, Query, Queryable, StatementLike,