Skip to content

Commit 3250395

Browse files
committed
Initiate Coordinator Reconnect w/ Backoff from Heartbeat Thread (#2695)
1 parent c505a9e commit 3250395

File tree

2 files changed

+4
-1
lines changed

2 files changed

+4
-1
lines changed

kafka/consumer/group.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -723,7 +723,9 @@ def _poll_once(self, timer, max_records, update_offsets=True):
723723

724724
# We do not want to be stuck blocking in poll if we are missing some positions
725725
# since the offset lookup may be backing off after a failure
726-
poll_timeout_ms = min(timer.timeout_ms, self._coordinator.time_to_next_poll() * 1000)
726+
poll_timeout_ms = timer.timeout_ms
727+
if self.config['group_id'] is not None:
728+
poll_timeout_ms = min(poll_timeout_ms, self._coordinator.time_to_next_poll() * 1000)
727729
if not has_all_fetch_positions:
728730
log.debug('poll: do not have all fetch positions...')
729731
poll_timeout_ms = min(poll_timeout_ms, self.config['retry_backoff_ms'])

kafka/coordinator/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,6 +1120,7 @@ def _run_once(self):
11201120
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
11211121

11221122
elif not self.coordinator.connected():
1123+
self.coordinator._client.maybe_connect(self.coordinator.coordinator_id)
11231124
self.coordinator._client._lock.release()
11241125
self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000)
11251126

0 commit comments

Comments
 (0)