@@ -153,6 +153,7 @@ def send_fetches(self):
153153 future = self ._client .send (node_id , request , wakeup = False )
154154 future .add_callback (self ._handle_fetch_response , node_id , fetch_offsets , time .time ())
155155 future .add_errback (self ._handle_fetch_error , node_id )
156+ future .add_both (self ._clear_pending_fetch_request , node_id )
156157 futures .append (future )
157158 self ._fetch_futures .extend (futures )
158159 self ._clean_done_fetch_futures ()
@@ -643,36 +644,42 @@ def _create_fetch_requests(self):
643644 log .debug ("Skipping fetch for partition %s because node %s is throttled" ,
644645 partition , node_id )
645646
647+ elif not self ._client .ready (node_id ):
648+ # Until we support send request queues, any attempt to send to a not-ready node will be
649+ # immediately failed with NodeNotReadyError.
650+ log .debug ("Skipping fetch for partition %s because connection to leader node is not ready yet" )
651+
646652 elif node_id in self ._nodes_with_pending_fetch_requests :
647653 log .debug ("Skipping fetch for partition %s because there is a pending fetch request to node %s" ,
648654 partition , node_id )
649- continue
650655
651- if version < 5 :
652- partition_info = (
653- partition .partition ,
654- position .offset ,
655- self .config ['max_partition_fetch_bytes' ]
656- )
657- elif version <= 8 :
658- partition_info = (
659- partition .partition ,
660- position .offset ,
661- - 1 , # log_start_offset is used internally by brokers / replicas only
662- self .config ['max_partition_fetch_bytes' ],
663- )
664656 else :
665- partition_info = (
666- partition .partition ,
667- position .leader_epoch ,
668- position .offset ,
669- - 1 , # log_start_offset is used internally by brokers / replicas only
670- self .config ['max_partition_fetch_bytes' ],
671- )
672-
673- fetchable [node_id ][partition ] = partition_info
674- log .debug ("Adding fetch request for partition %s at offset %d" ,
675- partition , position .offset )
657+ # Leader is connected and does not have a pending fetch request
658+ if version < 5 :
659+ partition_info = (
660+ partition .partition ,
661+ position .offset ,
662+ self .config ['max_partition_fetch_bytes' ]
663+ )
664+ elif version <= 8 :
665+ partition_info = (
666+ partition .partition ,
667+ position .offset ,
668+ - 1 , # log_start_offset is used internally by brokers / replicas only
669+ self .config ['max_partition_fetch_bytes' ],
670+ )
671+ else :
672+ partition_info = (
673+ partition .partition ,
674+ position .leader_epoch ,
675+ position .offset ,
676+ - 1 , # log_start_offset is used internally by brokers / replicas only
677+ self .config ['max_partition_fetch_bytes' ],
678+ )
679+
680+ fetchable [node_id ][partition ] = partition_info
681+ log .debug ("Adding fetch request for partition %s at offset %d" ,
682+ partition , position .offset )
676683
677684 requests = {}
678685 for node_id , next_partitions in six .iteritems (fetchable ):
@@ -761,14 +768,18 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response):
761768
762769 if self ._sensors :
763770 self ._sensors .fetch_latency .record ((time .time () - send_time ) * 1000 )
764- self ._nodes_with_pending_fetch_requests .remove (node_id )
765771
766772 def _handle_fetch_error (self , node_id , exception ):
767773 level = logging .INFO if isinstance (exception , Errors .Cancelled ) else logging .ERROR
768774 log .log (level , 'Fetch to node %s failed: %s' , node_id , exception )
769775 if node_id in self ._session_handlers :
770776 self ._session_handlers [node_id ].handle_error (exception )
771- self ._nodes_with_pending_fetch_requests .remove (node_id )
777+
778+ def _clear_pending_fetch_request (self , node_id , _ ):
779+ try :
780+ self ._nodes_with_pending_fetch_requests .remove (node_id )
781+ except KeyError :
782+ pass
772783
773784 def _parse_fetched_data (self , completed_fetch ):
774785 tp = completed_fetch .topic_partition
0 commit comments