@@ -8,10 +8,12 @@ use crate::rt::BoxedTaskHandle;
88use  crate :: rt:: Runtime ; 
99use  crate :: rt:: TcpOptions ; 
1010use  crate :: service:: Message ; 
11+ use  crate :: service:: MessageAllocator ; 
1112use  crate :: service:: Request  as  GrpcRequest ; 
1213use  crate :: service:: Response  as  GrpcResponse ; 
1314use  crate :: { client:: name_resolution:: TCP_IP_NETWORK_TYPE ,  service:: Service } ; 
1415use  bytes:: Bytes ; 
16+ use  bytes:: BytesMut ; 
1517use  http:: uri:: PathAndQuery ; 
1618use  http:: Request  as  HttpRequest ; 
1719use  http:: Response  as  HttpResponse ; 
@@ -63,7 +65,12 @@ impl Drop for TonicTransport {
6365
6466#[ async_trait]  
6567impl  Service  for  TonicTransport  { 
66-     async  fn  call ( & self ,  method :  String ,  request :  GrpcRequest )  -> GrpcResponse  { 
68+     async  fn  call ( 
69+         & self , 
70+         method :  String , 
71+         request :  GrpcRequest , 
72+         response_allocator :  Box < dyn  MessageAllocator > , 
73+     )  -> GrpcResponse  { 
6774        let  Ok ( path)  = PathAndQuery :: from_maybe_shared ( method)  else  { 
6875            let  err = Status :: internal ( "Failed to parse path" ) ; 
6976            return  create_error_response ( err) ; 
@@ -78,7 +85,7 @@ impl Service for TonicTransport {
7885        } ; 
7986        let  request = convert_request ( request) ; 
8087        let  response = grpc. streaming ( request,  path,  BytesCodec  { } ) . await ; 
81-         convert_response ( response) 
88+         convert_response ( response,  response_allocator ) 
8289    } 
8390} 
8491
@@ -92,9 +99,11 @@ fn convert_request(req: GrpcRequest) -> TonicRequest<Pin<Box<dyn Stream<Item = B
9299    let  ( metadata,  extensions,  stream)  = req. into_parts ( ) ; 
93100
94101    let  bytes_stream = Box :: pin ( stream. filter_map ( |msg| { 
95-         if  let  Ok ( bytes)  = ( msg as  Box < dyn  Any > ) . downcast :: < Bytes > ( )  { 
96-             Some ( * bytes) 
102+         let  mut  buf = BytesMut :: with_capacity ( msg. encoded_message_size_hint ( ) . unwrap_or ( 0 ) ) ; 
103+         if  let  Ok ( ( ) )  = msg. encode ( & mut  buf)  { 
104+             Some ( buf. freeze ( ) ) 
97105        }  else  { 
106+             // TODO: Handle encoding failures. 
98107            // If it fails, log the error and return None to filter it out. 
99108            eprintln ! ( "A message could not be downcast to Bytes and was skipped." ) ; 
100109            None 
@@ -104,7 +113,10 @@ fn convert_request(req: GrpcRequest) -> TonicRequest<Pin<Box<dyn Stream<Item = B
104113    TonicRequest :: from_parts ( metadata,  extensions,  bytes_stream as  _ ) 
105114} 
106115
107- fn  convert_response ( res :  Result < TonicResponse < Streaming < Bytes > > ,  Status > )  -> GrpcResponse  { 
116+ fn  convert_response ( 
117+     res :  Result < TonicResponse < Streaming < Bytes > > ,  Status > , 
118+     allocator :  Box < dyn  MessageAllocator > , 
119+ )  -> GrpcResponse  { 
108120    let  response = match  res { 
109121        Ok ( s)  => s, 
110122        Err ( e)  => { 
@@ -113,11 +125,14 @@ fn convert_response(res: Result<TonicResponse<Streaming<Bytes>>, Status>) -> Grp
113125        } 
114126    } ; 
115127    let  ( metadata,  stream,  extensions)  = response. into_parts ( ) ; 
116-     let  message_stream:  BoxStream < Box < dyn  Message > >  = Box :: pin ( stream. map ( |msg| { 
117-         msg. map ( |b| { 
118-             let  msg:  Box < dyn  Message >  = Box :: new ( b) ; 
119-             msg
120-         } ) 
128+     let  allocator:  Arc < dyn  MessageAllocator >  = Arc :: from ( allocator) ; 
129+     let  allocator_copy = allocator. clone ( ) ; 
130+     let  message_stream:  BoxStream < Box < dyn  Message > >  = Box :: pin ( stream. map ( move  |msg| { 
131+         let  allocator = allocator_copy. clone ( ) ; 
132+         let  buf = msg?; 
133+         let  mut  msg = allocator. allocate ( ) ; 
134+         msg. decode ( & buf) . map_err ( Status :: internal) ?; 
135+         Ok ( msg) 
121136    } ) ) ; 
122137    TonicResponse :: from_parts ( metadata,  message_stream,  extensions) 
123138} 
0 commit comments