@@ -114,6 +114,48 @@ const FAKE_OID: u32 = 0;
114114/// # } 
115115/// ``` 
116116/// 
117+ /// For more complex cases, an immutable reference to the connection need to be used: 
118+ /// ```rust 
119+ /// # include!("../doctest_setup.rs"); 
120+ /// use diesel_async::RunQueryDsl; 
121+ /// 
122+ /// # 
123+ /// # #[tokio::main(flavor = "current_thread")] 
124+ /// # async fn main() { 
125+ /// #     run_test().await.unwrap(); 
126+ /// # } 
127+ /// # 
128+ /// # async fn run_test() -> QueryResult<()> { 
129+ /// #     use diesel::sql_types::{Text, Integer}; 
130+ /// #     let conn = &mut establish_connection().await; 
131+ /// # 
132+ ///       async fn fn12(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> { 
133+ ///           let f1 = diesel::select(1_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn); 
134+ ///           let f2 = diesel::select(2_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn); 
135+ /// 
136+ ///           futures_util::try_join!(f1, f2) 
137+ ///       } 
138+ /// 
139+ ///       async fn fn34(mut conn: &AsyncPgConnection) -> QueryResult<(i32, i32)> { 
140+ ///           let f3 = diesel::select(3_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn); 
141+ ///           let f4 = diesel::select(4_i32.into_sql::<Integer>()).get_result::<i32>(&mut conn); 
142+ /// 
143+ ///           futures_util::try_join!(f3, f4) 
144+ ///       } 
145+ /// 
146+ ///       let f12 = fn12(&conn); 
147+ ///       let f34 = fn34(&conn); 
148+ /// 
149+ ///       let ((r1, r2), (r3, r4)) = futures_util::try_join!(f12, f34).unwrap(); 
150+ /// 
151+ ///       assert_eq!(r1, 1); 
152+ ///       assert_eq!(r2, 2); 
153+ ///       assert_eq!(r3, 3); 
154+ ///       assert_eq!(r4, 4); 
155+ ///       # Ok(()) 
156+ /// # } 
157+ /// ``` 
158+ /// 
117159/// ## TLS 
118160/// 
119161/// Connections created by [`AsyncPgConnection::establish`] do not support TLS. 
@@ -136,6 +178,12 @@ pub struct AsyncPgConnection {
136178} 
137179
138180impl  SimpleAsyncConnection  for  AsyncPgConnection  { 
181+     async  fn  batch_execute ( & mut  self ,  query :  & str )  -> QueryResult < ( ) >  { 
182+         SimpleAsyncConnection :: batch_execute ( & mut  & * self ,  query) . await 
183+     } 
184+ } 
185+ 
186+ impl  SimpleAsyncConnection  for  & AsyncPgConnection  { 
139187    async  fn  batch_execute ( & mut  self ,  query :  & str )  -> QueryResult < ( ) >  { 
140188        self . record_instrumentation ( InstrumentationEvent :: start_query ( & StrQueryHelper :: new ( 
141189            query, 
@@ -167,6 +215,38 @@ impl AsyncConnectionCore for AsyncPgConnection {
167215    type  Row < ' conn ,  ' query >  = PgRow ; 
168216    type  Backend  = diesel:: pg:: Pg ; 
169217
218+     fn  load < ' conn ,  ' query ,  T > ( & ' conn  mut  self ,  source :  T )  -> Self :: LoadFuture < ' conn ,  ' query > 
219+     where 
220+         T :  AsQuery  + ' query , 
221+         T :: Query :  QueryFragment < Self :: Backend >  + QueryId  + ' query , 
222+     { 
223+         AsyncConnectionCore :: load ( & mut  & * self ,  source) 
224+     } 
225+ 
226+     fn  execute_returning_count < ' conn ,  ' query ,  T > ( 
227+         & ' conn  mut  self , 
228+         source :  T , 
229+     )  -> Self :: ExecuteFuture < ' conn ,  ' query > 
230+     where 
231+         T :  QueryFragment < Self :: Backend >  + QueryId  + ' query , 
232+     { 
233+         AsyncConnectionCore :: execute_returning_count ( & mut  & * self ,  source) 
234+     } 
235+ } 
236+ 
237+ impl  AsyncConnectionCore  for  & AsyncPgConnection  { 
238+     type  LoadFuture < ' conn ,  ' query >  =
239+         <AsyncPgConnection  as  AsyncConnectionCore >:: LoadFuture < ' conn ,  ' query > ; 
240+ 
241+     type  ExecuteFuture < ' conn ,  ' query >  =
242+         <AsyncPgConnection  as  AsyncConnectionCore >:: ExecuteFuture < ' conn ,  ' query > ; 
243+ 
244+     type  Stream < ' conn ,  ' query >  = <AsyncPgConnection  as  AsyncConnectionCore >:: Stream < ' conn ,  ' query > ; 
245+ 
246+     type  Row < ' conn ,  ' query >  = <AsyncPgConnection  as  AsyncConnectionCore >:: Row < ' conn ,  ' query > ; 
247+ 
248+     type  Backend  = <AsyncPgConnection  as  AsyncConnectionCore >:: Backend ; 
249+ 
170250    fn  load < ' conn ,  ' query ,  T > ( & ' conn  mut  self ,  source :  T )  -> Self :: LoadFuture < ' conn ,  ' query > 
171251    where 
172252        T :  AsQuery  + ' query , 
@@ -942,11 +1022,15 @@ mod tests {
9421022    use  crate :: run_query_dsl:: RunQueryDsl ; 
9431023    use  diesel:: sql_types:: Integer ; 
9441024    use  diesel:: IntoSql ; 
1025+     use  futures_util:: future:: try_join; 
1026+     use  futures_util:: try_join; 
1027+     use  scoped_futures:: ScopedFutureExt ; 
9451028
9461029    #[ tokio:: test]  
9471030    async  fn  pipelining ( )  { 
9481031        let  database_url =
9491032            std:: env:: var ( "DATABASE_URL" ) . expect ( "DATABASE_URL must be set in order to run tests" ) ; 
1033+ 
9501034        let  mut  conn = crate :: AsyncPgConnection :: establish ( & database_url) 
9511035            . await 
9521036            . unwrap ( ) ; 
@@ -957,9 +1041,100 @@ mod tests {
9571041        let  f1 = q1. get_result :: < i32 > ( & mut  conn) ; 
9581042        let  f2 = q2. get_result :: < i32 > ( & mut  conn) ; 
9591043
960-         let  ( r1,  r2)  = futures_util:: try_join!( f1,  f2) . unwrap ( ) ; 
1044+         let  ( r1,  r2)  = try_join ! ( f1,  f2) . unwrap ( ) ; 
1045+ 
1046+         assert_eq ! ( r1,  1 ) ; 
1047+         assert_eq ! ( r2,  2 ) ; 
1048+     } 
1049+ 
1050+     #[ tokio:: test]  
1051+     async  fn  pipelining_with_composed_futures ( )  { 
1052+         let  database_url =
1053+             std:: env:: var ( "DATABASE_URL" ) . expect ( "DATABASE_URL must be set in order to run tests" ) ; 
1054+ 
1055+         let  conn = crate :: AsyncPgConnection :: establish ( & database_url) 
1056+             . await 
1057+             . unwrap ( ) ; 
1058+ 
1059+         async  fn  fn12 ( mut  conn :  & AsyncPgConnection )  -> QueryResult < ( i32 ,  i32 ) >  { 
1060+             let  f1 = diesel:: select ( 1_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut  conn) ; 
1061+             let  f2 = diesel:: select ( 2_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut  conn) ; 
1062+ 
1063+             try_join ! ( f1,  f2) 
1064+         } 
1065+ 
1066+         async  fn  fn34 ( mut  conn :  & AsyncPgConnection )  -> QueryResult < ( i32 ,  i32 ) >  { 
1067+             let  f3 = diesel:: select ( 3_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut  conn) ; 
1068+             let  f4 = diesel:: select ( 4_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut  conn) ; 
1069+ 
1070+             try_join ! ( f3,  f4) 
1071+         } 
1072+ 
1073+         let  f12 = fn12 ( & conn) ; 
1074+         let  f34 = fn34 ( & conn) ; 
1075+ 
1076+         let  ( ( r1,  r2) ,  ( r3,  r4) )  = try_join ! ( f12,  f34) . unwrap ( ) ; 
9611077
9621078        assert_eq ! ( r1,  1 ) ; 
9631079        assert_eq ! ( r2,  2 ) ; 
1080+         assert_eq ! ( r3,  3 ) ; 
1081+         assert_eq ! ( r4,  4 ) ; 
1082+     } 
1083+ 
1084+     #[ tokio:: test]  
1085+     async  fn  pipelining_with_composed_futures_and_transaction ( )  { 
1086+         let  database_url =
1087+             std:: env:: var ( "DATABASE_URL" ) . expect ( "DATABASE_URL must be set in order to run tests" ) ; 
1088+ 
1089+         let  mut  conn = crate :: AsyncPgConnection :: establish ( & database_url) 
1090+             . await 
1091+             . unwrap ( ) ; 
1092+ 
1093+         fn  erase < ' a ,  T :  Future  + Send  + ' a > ( t :  T )  -> impl  Future < Output  = T :: Output >  + Send  + ' a  { 
1094+             t
1095+         } 
1096+ 
1097+         async  fn  fn12 ( mut  conn :  & AsyncPgConnection )  -> QueryResult < ( i32 ,  i32 ) >  { 
1098+             let  f1 = diesel:: select ( 1_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut  conn) ; 
1099+             let  f2 = diesel:: select ( 2_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut  conn) ; 
1100+ 
1101+             erase ( try_join ( f1,  f2) ) . await 
1102+         } 
1103+ 
1104+         async  fn  fn34 ( mut  conn :  & AsyncPgConnection )  -> QueryResult < ( i32 ,  i32 ) >  { 
1105+             let  f3 = diesel:: select ( 3_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut  conn) ; 
1106+             let  f4 = diesel:: select ( 4_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut  conn) ; 
1107+ 
1108+             try_join ( f3,  f4) . boxed ( ) . await 
1109+         } 
1110+ 
1111+         async  fn  fn56 ( mut  conn :  & AsyncPgConnection )  -> QueryResult < ( i32 ,  i32 ) >  { 
1112+             let  f5 = diesel:: select ( 5_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut  conn) ; 
1113+             let  f6 = diesel:: select ( 6_i32 . into_sql :: < Integer > ( ) ) . get_result :: < i32 > ( & mut  conn) ; 
1114+ 
1115+             try_join ! ( f5. boxed( ) ,  f6. boxed( ) ) 
1116+         } 
1117+ 
1118+         conn. transaction ( |conn| { 
1119+             async  move  { 
1120+                 let  f12 = fn12 ( conn) ; 
1121+                 let  f34 = fn34 ( conn) ; 
1122+                 let  f56 = fn56 ( conn) ; 
1123+ 
1124+                 let  ( ( r1,  r2) ,  ( r3,  r4) ,  ( r5,  r6) )  = try_join ! ( f12,  f34,  f56) . unwrap ( ) ; 
1125+ 
1126+                 assert_eq ! ( r1,  1 ) ; 
1127+                 assert_eq ! ( r2,  2 ) ; 
1128+                 assert_eq ! ( r3,  3 ) ; 
1129+                 assert_eq ! ( r4,  4 ) ; 
1130+                 assert_eq ! ( r5,  5 ) ; 
1131+                 assert_eq ! ( r6,  6 ) ; 
1132+ 
1133+                 QueryResult :: < _ > :: Ok ( ( ) ) 
1134+             } 
1135+             . scope_boxed ( ) 
1136+         } ) 
1137+         . await 
1138+         . unwrap ( ) ; 
9641139    } 
9651140} 
0 commit comments