11use async_compat:: Compat ;
22use futures:: future:: { self } ;
33use futures_util:: FutureExt ;
4+ use http_body_util:: Empty ;
5+ use hyper:: body:: Bytes ;
6+ use hyper_util:: rt:: TokioIo ;
47use nginx_sys:: { ngx_http_core_loc_conf_t, NGX_LOG_ERR } ;
58use ngx:: async_:: resolver:: Resolver ;
69use ngx:: async_:: { spawn, Task } ;
@@ -12,13 +15,14 @@ use std::ptr::{addr_of, addr_of_mut, NonNull};
1215use std:: sync:: atomic:: { AtomicPtr , Ordering } ;
1316use std:: task:: Poll ;
1417use std:: time:: Instant ;
18+ use tokio:: net:: TcpStream ;
1519
1620use ngx:: core:: { self , Pool , Status } ;
1721use ngx:: ffi:: {
1822 ngx_array_push, ngx_command_t, ngx_conf_t, ngx_connection_t, ngx_http_handler_pt,
1923 ngx_http_module_t, ngx_http_phases_NGX_HTTP_ACCESS_PHASE, ngx_int_t, ngx_module_t,
20- ngx_post_event, ngx_posted_events, ngx_str_t, ngx_uint_t,
21- NGX_CONF_TAKE1 , NGX_HTTP_LOC_CONF , NGX_HTTP_LOC_CONF_OFFSET , NGX_HTTP_MODULE , NGX_LOG_EMERG ,
24+ ngx_post_event, ngx_posted_events, ngx_str_t, ngx_uint_t, NGX_CONF_TAKE1 , NGX_HTTP_LOC_CONF ,
25+ NGX_HTTP_LOC_CONF_OFFSET , NGX_HTTP_MODULE , NGX_LOG_EMERG ,
2226} ;
2327use ngx:: http:: { self , HTTPStatus , HttpModule , MergeConfigError , Request } ;
2428use ngx:: http:: { HttpModuleLocationConf , HttpModuleMainConf , NgxHttpCoreModule } ;
@@ -167,16 +171,54 @@ async fn resolve_something(
167171 )
168172}
169173
170- async fn request_something ( uri : & str ) -> ( String , String ) {
174+ async fn reqwest_something ( ) -> ( String , String ) {
171175 let start = Instant :: now ( ) ;
172- let _ = reqwest:: get ( uri )
176+ let _ = reqwest:: get ( "https://example.com" )
173177 . await
174178 . expect ( "response" )
175179 . text ( )
176180 . await
177181 . expect ( "body" ) ;
178182 (
179- format ! ( "X-Request-Time-{uri}" ) ,
183+ "X-Reqwest-Time" . to_string ( ) ,
184+ start. elapsed ( ) . as_millis ( ) . to_string ( ) ,
185+ )
186+ }
187+
188+ async fn hyper_something ( ) -> ( String , String ) {
189+ let start = Instant :: now ( ) ;
190+ let url = "http://httpbin.org/ip" . parse :: < hyper:: Uri > ( ) . expect ( "uri" ) ;
191+ let host = url. host ( ) . expect ( "uri has no host" ) ;
192+ let port = url. port_u16 ( ) . unwrap_or ( 80 ) ;
193+
194+ let address = format ! ( "{}:{}" , host, port) ;
195+
196+ let stream = TcpStream :: connect ( address) . await . expect ( "connect" ) ;
197+
198+ let io = TokioIo :: new ( stream) ;
199+
200+ // Create the Hyper client
201+ let ( mut sender, conn) = hyper:: client:: conn:: http1:: handshake ( io)
202+ . await
203+ . expect ( "handshake" ) ;
204+ // Spawn a task to poll the connection, driving the HTTP state
205+ let http_task = spawn ( async move {
206+ if let Err ( err) = conn. await {
207+ println ! ( "Connection failed: {:?}" , err) ;
208+ }
209+ } ) ;
210+ let authority = url. authority ( ) . unwrap ( ) . clone ( ) ;
211+ let req = hyper:: Request :: builder ( )
212+ . uri ( url)
213+ . header ( hyper:: header:: HOST , authority. as_str ( ) )
214+ . body ( Empty :: < Bytes > :: new ( ) )
215+ . expect ( "body" ) ;
216+ let _ = sender. send_request ( req) . await . expect ( "response" ) ;
217+
218+ http_task. cancel ( ) . await ;
219+
220+ (
221+ "X-Hyper-Time" . to_string ( ) ,
180222 start. elapsed ( ) . as_millis ( ) . to_string ( ) ,
181223 )
182224}
@@ -197,8 +239,9 @@ async fn async_access(request: &mut Request) -> Status {
197239 // yield_now
198240 Box :: pin( waste_yield( ) ) ,
199241 // reqwest
200- Box :: pin( request_something( "https://example.com" ) ) ,
201- Box :: pin( request_something( "https://example.org" ) ) ,
242+ Box :: pin( reqwest_something( ) ) ,
243+ // hyper
244+ Box :: pin( hyper_something( ) ) ,
202245 ] ;
203246 for ( header, value) in futures:: future:: join_all ( futs) . await {
204247 request. add_header_out ( & header, & value) ;
0 commit comments