@@ -129,7 +129,12 @@ impl Call {
129129 tag,
130130 )
131131 } ) ;
132- Ok ( ClientUnaryReceiver :: new ( call, cq_f, method. resp_de ( ) ) )
132+ Ok ( ClientUnaryReceiver :: new (
133+ call,
134+ cq_f,
135+ method. resp_de ( ) ,
136+ channel,
137+ ) )
133138 }
134139
135140 pub fn client_streaming < Req , Resp > (
@@ -151,11 +156,12 @@ impl Call {
151156 } ) ;
152157
153158 let share_call = Arc :: new ( SpinLock :: new ( ShareCall :: new ( call, cq_f) ) ) ;
154- let sink = ClientCStreamSender :: new ( share_call. clone ( ) , method. req_ser ( ) ) ;
159+ let sink = ClientCStreamSender :: new ( share_call. clone ( ) , method. req_ser ( ) , channel ) ;
155160 let recv = ClientCStreamReceiver {
156161 call : share_call,
157162 resp_de : method. resp_de ( ) ,
158163 finished : false ,
164+ _channel_keepalive : channel. clone ( ) ,
159165 } ;
160166 Ok ( ( sink, recv) )
161167 }
@@ -189,7 +195,12 @@ impl Call {
189195 grpc_sys:: grpcwrap_call_recv_initial_metadata ( call. call , ctx, tag)
190196 } ) ;
191197
192- Ok ( ClientSStreamReceiver :: new ( call, cq_f, method. resp_de ( ) ) )
198+ Ok ( ClientSStreamReceiver :: new (
199+ call,
200+ cq_f,
201+ method. resp_de ( ) ,
202+ channel,
203+ ) )
193204 }
194205
195206 pub fn duplex_streaming < Req , Resp > (
@@ -216,8 +227,8 @@ impl Call {
216227 } ) ;
217228
218229 let share_call = Arc :: new ( SpinLock :: new ( ShareCall :: new ( call, cq_f) ) ) ;
219- let sink = ClientDuplexSender :: new ( share_call. clone ( ) , method. req_ser ( ) ) ;
220- let recv = ClientDuplexReceiver :: new ( share_call, method. resp_de ( ) ) ;
230+ let sink = ClientDuplexSender :: new ( share_call. clone ( ) , method. req_ser ( ) , channel ) ;
231+ let recv = ClientDuplexReceiver :: new ( share_call, method. resp_de ( ) , channel ) ;
221232 Ok ( ( sink, recv) )
222233 }
223234}
@@ -230,14 +241,21 @@ pub struct ClientUnaryReceiver<T> {
230241 call : Call ,
231242 resp_f : BatchFuture ,
232243 resp_de : DeserializeFn < T > ,
244+ _channel_keepalive : Channel ,
233245}
234246
235247impl < T > ClientUnaryReceiver < T > {
236- fn new ( call : Call , resp_f : BatchFuture , resp_de : DeserializeFn < T > ) -> ClientUnaryReceiver < T > {
248+ fn new (
249+ call : Call ,
250+ resp_f : BatchFuture ,
251+ resp_de : DeserializeFn < T > ,
252+ _channel_keepalive : & Channel ,
253+ ) -> ClientUnaryReceiver < T > {
237254 ClientUnaryReceiver {
238255 call,
239256 resp_f,
240257 resp_de,
258+ _channel_keepalive : _channel_keepalive. clone ( ) ,
241259 }
242260 }
243261
@@ -276,6 +294,7 @@ pub struct ClientCStreamReceiver<T> {
276294 call : Arc < SpinLock < ShareCall > > ,
277295 resp_de : DeserializeFn < T > ,
278296 finished : bool ,
297+ _channel_keepalive : Channel ,
279298}
280299
281300impl < T > ClientCStreamReceiver < T > {
@@ -326,15 +345,21 @@ pub struct StreamingCallSink<Req> {
326345 sink_base : SinkBase ,
327346 close_f : Option < BatchFuture > ,
328347 req_ser : SerializeFn < Req > ,
348+ _channel_keepalive : Channel ,
329349}
330350
331351impl < Req > StreamingCallSink < Req > {
332- fn new ( call : Arc < SpinLock < ShareCall > > , req_ser : SerializeFn < Req > ) -> StreamingCallSink < Req > {
352+ fn new (
353+ call : Arc < SpinLock < ShareCall > > ,
354+ req_ser : SerializeFn < Req > ,
355+ _channel_keepalive : & Channel ,
356+ ) -> StreamingCallSink < Req > {
333357 StreamingCallSink {
334358 call,
335359 sink_base : SinkBase :: new ( false ) ,
336360 close_f : None ,
337361 req_ser,
362+ _channel_keepalive : _channel_keepalive. clone ( ) ,
338363 }
339364 }
340365
@@ -490,17 +515,20 @@ impl<H: ShareCallHolder, T> ResponseStreamImpl<H, T> {
490515#[ must_use = "if unused the ClientSStreamReceiver may immediately cancel the RPC" ]
491516pub struct ClientSStreamReceiver < Resp > {
492517 imp : ResponseStreamImpl < ShareCall , Resp > ,
518+ _channel_keepalive : Channel ,
493519}
494520
495521impl < Resp > ClientSStreamReceiver < Resp > {
496522 fn new (
497523 call : Call ,
498524 finish_f : BatchFuture ,
499525 de : DeserializeFn < Resp > ,
526+ _channel_keepalive : & Channel ,
500527 ) -> ClientSStreamReceiver < Resp > {
501528 let share_call = ShareCall :: new ( call, finish_f) ;
502529 ClientSStreamReceiver {
503530 imp : ResponseStreamImpl :: new ( share_call, de) ,
531+ _channel_keepalive : _channel_keepalive. clone ( ) ,
504532 }
505533 }
506534
@@ -528,12 +556,18 @@ impl<Resp> Stream for ClientSStreamReceiver<Resp> {
528556#[ must_use = "if unused the ClientDuplexReceiver may immediately cancel the RPC" ]
529557pub struct ClientDuplexReceiver < Resp > {
530558 imp : ResponseStreamImpl < Arc < SpinLock < ShareCall > > , Resp > ,
559+ _channel_keepalive : Channel ,
531560}
532561
533562impl < Resp > ClientDuplexReceiver < Resp > {
534- fn new ( call : Arc < SpinLock < ShareCall > > , de : DeserializeFn < Resp > ) -> ClientDuplexReceiver < Resp > {
563+ fn new (
564+ call : Arc < SpinLock < ShareCall > > ,
565+ de : DeserializeFn < Resp > ,
566+ _channel_keepalive : & Channel ,
567+ ) -> ClientDuplexReceiver < Resp > {
535568 ClientDuplexReceiver {
536569 imp : ResponseStreamImpl :: new ( call, de) ,
570+ _channel_keepalive : _channel_keepalive. clone ( ) ,
537571 }
538572 }
539573
0 commit comments