diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java index 998ac2c585d59..1d36f6d2d81f8 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java @@ -2009,7 +2009,7 @@ public void testShareAutoOffsetResetByDurationInvalidFormat() { public void testShareConsumerAfterCoordinatorMovement() throws Exception { String topicName = "multipart"; String groupId = "multipartGrp"; - Uuid topicId = createTopic(topicName, 3, 3); + Uuid topicId = createTopic(topicName, 1, 3); alterShareAutoOffsetReset(groupId, "earliest"); ScheduledExecutorService service = Executors.newScheduledThreadPool(5); @@ -2253,7 +2253,7 @@ public void testBehaviorOnDeliveryCountBoundary() { public void testComplexShareConsumer() throws Exception { String topicName = "multipart"; String groupId = "multipartGrp"; - createTopic(topicName, 3, 3); + createTopic(topicName, 1, 3); TopicPartition multiTp = new TopicPartition(topicName, 0); ScheduledExecutorService service = Executors.newScheduledThreadPool(5); @@ -2307,7 +2307,6 @@ public void testComplexShareConsumer() throws Exception { serverProperties = { @ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"), @ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer,share"), - @ClusterConfigProperty(key = "group.share.enable", value = "true"), @ClusterConfigProperty(key = "group.share.partition.max.record.locks", value = "10000"), @ClusterConfigProperty(key = "group.share.record.lock.duration.ms", value = "15000"), @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java index 51e3fb39dfb0e..7b9e9993c57eb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java @@ -89,6 +89,7 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi private final IdempotentCloser idempotentCloser = new IdempotentCloser(); private Uuid memberId; private boolean fetchMoreRecords = false; + private final AtomicInteger fetchRecordsNodeId = new AtomicInteger(-1); private final Map> fetchAcknowledgementsToSend; private final Map> fetchAcknowledgementsInFlight; private final Map> acknowledgeRequestStates; @@ -169,7 +170,7 @@ public PollResult poll(long currentTimeMs) { if (nodesWithPendingRequests.contains(node.id())) { log.trace("Skipping fetch for partition {} because previous fetch request to {} has not been processed", partition, node.id()); } else { - // if there is a leader and no in-flight requests, issue a new fetch + // If there is a leader and no in-flight requests, issue a new fetch. ShareSessionHandler handler = handlerMap.computeIfAbsent(node, k -> sessionHandlers.computeIfAbsent(node.id(), n -> new ShareSessionHandler(logContext, n, memberId))); @@ -196,11 +197,15 @@ public PollResult poll(long currentTimeMs) { } topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); + // If we have not chosen a node for fetching records yet, choose now, and rotate the + // assigned partitions so the next poll starts on a different partition. + if (fetchRecordsNodeId.compareAndSet(-1, node.id())) { + subscriptions.movePartitionToEnd(partition); + } log.debug("Added fetch request for partition {} to node {}", tip, node.id()); } } - // Iterate over the session handlers to see if there are acknowledgements to be sent for partitions // which are no longer part of the current subscription. // We fail acknowledgements for records fetched from a previous leader. @@ -226,7 +231,7 @@ public PollResult poll(long currentTimeMs) { topicNamesMap.putIfAbsent(new IdAndPartition(tip.topicId(), tip.partition()), tip.topic()); log.debug("Added fetch request for previously subscribed partition {} to node {}", tip, nodeId); } else { - log.debug("Leader for the partition is down or has changed, failing Acknowledgements for partition {}", tip); + log.debug("Leader for the partition is down or has changed, failing acknowledgements for partition {}", tip); acks.complete(Errors.NOT_LEADER_OR_FOLLOWER.exception()); maybeSendShareAcknowledgeCommitCallbackEvent(Map.of(tip, acks)); } @@ -238,13 +243,27 @@ public PollResult poll(long currentTimeMs) { } }); - // Iterate over the share session handlers and build a list of UnsentRequests + // Iterate over the share session handlers and build a list of UnsentRequests. List requests = handlerMap.entrySet().stream().map(entry -> { Node target = entry.getKey(); ShareSessionHandler handler = entry.getValue(); log.trace("Building ShareFetch request to send to node {}", target.id()); ShareFetchRequest.Builder requestBuilder = handler.newShareFetchBuilder(groupId, fetchConfig); + // We only send a full ShareFetch to a single node at a time. We prepare to + // build ShareFetch requests for all nodes with session handlers to permit + // piggy-backing of acknowledgements, and also to adjust the topic-partitions + // in the share session. + if (target.id() != fetchRecordsNodeId.get()) { + ShareFetchRequestData data = requestBuilder.data(); + // If there's nothing to send, just skip building the record. + if (data.topics().isEmpty() && data.forgottenTopicsData().isEmpty()) { + return null; + } else { + // There is something to send, but we don't want to fetch any records. + requestBuilder.data().setMaxRecords(0); + } + } nodesWithPendingRequests.add(target.id()); @@ -256,11 +275,20 @@ public PollResult poll(long currentTimeMs) { } }; return new UnsentRequest(requestBuilder, Optional.of(target)).whenComplete(responseHandler); - }).collect(Collectors.toList()); + }).filter(Objects::nonNull).collect(Collectors.toList()); return new PollResult(requests); } + @Override + public long maximumTimeToWait(long currentTimeMs) { + // When fetching records and there is no chosen node for fetching, we do not want to wait for the next poll + if (fetchMoreRecords && subscriptions.numAssignedPartitions() > 0 && fetchRecordsNodeId.get() == -1) { + return 0L; + } + return Long.MAX_VALUE; + } + /** * * @return True if we can add acknowledgements to the share session. @@ -849,6 +877,7 @@ private void handleShareFetchSuccess(Node fetchTarget, metricsManager.recordLatency(resp.destination(), resp.requestLatencyMs()); } finally { log.debug("Removing pending request for node {} - success", fetchTarget.id()); + fetchRecordsNodeId.compareAndSet(fetchTarget.id(), -1); nodesWithPendingRequests.remove(fetchTarget.id()); } } @@ -887,6 +916,7 @@ private void handleShareFetchFailure(Node fetchTarget, })); } finally { log.debug("Removing pending request for node {} - failed", fetchTarget.id()); + fetchRecordsNodeId.compareAndSet(fetchTarget.id(), -1); nodesWithPendingRequests.remove(fetchTarget.id()); } } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java index a4268b7eca0a7..4d4070dc7205d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java @@ -90,7 +90,6 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -108,8 +107,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import static java.util.Collections.singleton; -import static java.util.Collections.singletonList; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.internals.events.CompletableEvent.calculateDeadlineMs; @@ -168,6 +165,7 @@ public class ShareConsumeRequestManagerTest { private TestableShareConsumeRequestManager shareConsumeRequestManager; private TestableNetworkClientDelegate networkClientDelegate; private MemoryRecords records; + private MemoryRecords moreRecords; private List acquiredRecords; private List emptyAcquiredRecords; private ShareFetchMetricsRegistry shareFetchMetricsRegistry; @@ -176,6 +174,7 @@ public class ShareConsumeRequestManagerTest { @BeforeEach public void setup() { records = buildRecords(1L, 3, 1); + moreRecords = buildRecords(4L, 1, 4); acquiredRecords = ShareCompletedFetchTest.acquiredRecords(1L, 3); emptyAcquiredRecords = new ArrayList<>(); completedAcknowledgements = new LinkedList<>(); @@ -189,7 +188,7 @@ private void assignFromSubscribed(Set partitions) { // A dummy metadata update to ensure valid leader epoch. metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("kafka-cluster", 1, - Collections.emptyMap(), topicPartitionCounts, + Map.of(), topicPartitionCounts, tp -> validLeaderEpoch, topicIds), false, 0L); } @@ -209,7 +208,7 @@ private int sendFetches() { public void testFetchNormal() { buildRequestManager(); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); Map>> partitionRecords = fetchRecords(); @@ -223,7 +222,7 @@ public void testFetchNormal() { public void testFetchWithAcquiredRecords() { buildRequestManager(); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE); Map>> partitionRecords = fetchRecords(); @@ -239,7 +238,7 @@ public void testMultipleFetches() { buildRequestManager(); // Enabling the config so that background event is sent when the acknowledgement response is received. shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE); @@ -252,7 +251,7 @@ public void testMultipleFetches() { Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(1L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Map.of()); sendFetchAndVerifyResponse(records, ShareCompletedFetchTest.acquiredRecords(2L, 1), Errors.NONE); assertEquals(1.0, @@ -265,10 +264,10 @@ public void testMultipleFetches() { Acknowledgements acknowledgements2 = Acknowledgements.empty(); acknowledgements2.add(2L, AcknowledgeType.REJECT); - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)), Map.of()); // Preparing a response with an acknowledgement error. - sendFetchAndVerifyResponse(records, Collections.emptyList(), Errors.NONE, Errors.INVALID_RECORD_STATE); + sendFetchAndVerifyResponse(records, List.of(), Errors.NONE, Errors.INVALID_RECORD_STATE); assertEquals(2.0, metrics.metrics().get(metrics.metricInstance(shareFetchMetricsRegistry.acknowledgementSendTotal)).metricValue()); @@ -286,7 +285,7 @@ public void testCommitSync() { // Enabling the config so that background event is sent when the acknowledgement response is received. shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); @@ -309,7 +308,7 @@ public void testCommitAsync() { // Enabling the config so that background event is sent when the acknowledgement response is received. shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); @@ -332,7 +331,7 @@ public void testServerDisconnectedOnShareAcknowledge() throws InterruptedExcepti // Enabling the config so that background event is sent when the acknowledgement response is received. shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); fetchRecords(); @@ -384,14 +383,14 @@ public void testAcknowledgeOnClose() { // Enabling the config so that background event is sent when the acknowledgement response is received. shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(1L, AcknowledgeType.ACCEPT); // Piggyback acknowledgements - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Map.of()); // Remaining acknowledgements sent with close(). Acknowledgements acknowledgements2 = getAcknowledgements(2, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); @@ -418,14 +417,14 @@ public void testAcknowledgeOnCloseWithPendingCommitAsync() { // Enabling the config so that background event is sent when the acknowledgement response is received. shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); shareConsumeRequestManager.commitAsync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), calculateDeadlineMs(time.timer(defaultApiTimeoutMs))); - shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(), + shareConsumeRequestManager.acknowledgeOnClose(Map.of(), calculateDeadlineMs(time.timer(100))); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); @@ -446,14 +445,14 @@ public void testAcknowledgeOnCloseWithPendingCommitSync() { // Enabling the config so that background event is sent when the acknowledgement response is received. shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); shareConsumeRequestManager.commitSync(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), calculateDeadlineMs(time.timer(100))); - shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(), + shareConsumeRequestManager.acknowledgeOnClose(Map.of(), calculateDeadlineMs(time.timer(100))); assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); @@ -549,7 +548,7 @@ public void testResultHandlerCompleteIfEmpty() { public void testBatchingAcknowledgeRequestStates() { buildRequestManager(); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(buildRecords(1L, 6, 1), ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE); @@ -581,7 +580,7 @@ public void testBatchingAcknowledgeRequestStates() { public void testPendingCommitAsyncBeforeCommitSync() { buildRequestManager(); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(buildRecords(1L, 6, 1), ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE); @@ -625,7 +624,7 @@ public void testPendingCommitAsyncBeforeCommitSync() { public void testRetryAcknowledgements() throws InterruptedException { buildRequestManager(); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(buildRecords(1L, 6, 1), ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE); @@ -664,7 +663,7 @@ public void testFatalErrorsAcknowledgementResponse(Errors error) { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); @@ -688,7 +687,7 @@ public void testRetryAcknowledgementsMultipleCommitAsync() { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(buildRecords(1L, 6, 1), ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE); @@ -744,7 +743,7 @@ public void testRetryAcknowledgementsMultipleCommitSync() { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(buildRecords(1L, 6, 1), ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE); @@ -787,7 +786,7 @@ public void testRetryAcknowledgementsMultipleCommitSync() { public void testPiggybackAcknowledgementsInFlight() { buildRequestManager(); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); Acknowledgements acknowledgements = getAcknowledgements(1, @@ -797,7 +796,7 @@ public void testPiggybackAcknowledgementsInFlight() { fetchRecords(); // Piggyback acknowledgements - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Map.of()); assertEquals(1, sendFetches()); assertFalse(shareConsumeRequestManager.hasCompletedFetches()); @@ -807,7 +806,7 @@ public void testPiggybackAcknowledgementsInFlight() { Acknowledgements acknowledgements2 = Acknowledgements.empty(); acknowledgements2.add(3L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements2)), Map.of()); client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); @@ -826,13 +825,13 @@ public void testCommitAsyncWithSubscriptionChange() { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName2)); - subscriptions.assignFromSubscribed(Collections.singleton(t2p0)); + subscriptions.subscribeToShareGroup(Set.of(topicName2)); + subscriptions.assignFromSubscribed(Set.of(t2p0)); client.updateMetadata( RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName2, 1), @@ -862,13 +861,13 @@ public void testCommitSyncWithSubscriptionChange() { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName2)); - subscriptions.assignFromSubscribed(Collections.singleton(t2p0)); + subscriptions.subscribeToShareGroup(Set.of(topicName2)); + subscriptions.assignFromSubscribed(Set.of(t2p0)); client.updateMetadata( RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName2, 1), @@ -898,13 +897,13 @@ public void testCloseWithSubscriptionChange() { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName2)); - subscriptions.assignFromSubscribed(Collections.singleton(t2p0)); + subscriptions.subscribeToShareGroup(Set.of(topicName2)); + subscriptions.assignFromSubscribed(Set.of(t2p0)); client.updateMetadata( RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName2, 1), @@ -929,17 +928,17 @@ public void testCloseWithSubscriptionChange() { public void testShareFetchWithSubscriptionChange() { buildRequestManager(); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT); // Send acknowledgements via ShareFetch - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Map.of()); fetchRecords(); // Subscription changes. - subscriptions.subscribeToShareGroup(Collections.singleton(topicName2)); - subscriptions.assignFromSubscribed(Collections.singleton(t2p0)); + subscriptions.subscribeToShareGroup(Set.of(topicName2)); + subscriptions.assignFromSubscribed(Set.of(t2p0)); client.updateMetadata( RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName2, 1), @@ -955,8 +954,8 @@ public void testShareFetchWithSubscriptionChange() { public void testShareFetchWithSubscriptionChangeMultipleNodes() { buildRequestManager(); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); - subscriptions.assignFromSubscribed(Collections.singletonList(tp0)); + subscriptions.subscribeToShareGroup(Set.of(topicName)); + subscriptions.assignFromSubscribed(List.of(tp0)); client.updateMetadata( RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2), @@ -974,10 +973,11 @@ public void testShareFetchWithSubscriptionChangeMultipleNodes() { Acknowledgements acknowledgements = getAcknowledgements(0, AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT); // Send acknowledgements via ShareFetch - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Map.of()); fetchRecords(); + // Subscription changes. - subscriptions.assignFromSubscribed(Collections.singletonList(tp1)); + subscriptions.assignFromSubscribed(List.of(tp1)); NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult(); assertEquals(2, pollResult.unsentRequests.size()); @@ -1022,8 +1022,8 @@ public void testShareFetchWithSubscriptionChangeMultipleNodes() { public void testShareFetchWithSubscriptionChangeMultipleNodesEmptyAcknowledgements() { buildRequestManager(); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); - subscriptions.assignFromSubscribed(Collections.singletonList(tp0)); + subscriptions.subscribeToShareGroup(Set.of(topicName)); + subscriptions.assignFromSubscribed(List.of(tp0)); client.updateMetadata( RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2), @@ -1042,8 +1042,7 @@ public void testShareFetchWithSubscriptionChangeMultipleNodesEmptyAcknowledgemen fetchRecords(); // Change the subscription. - subscriptions.assignFromSubscribed(Collections.singletonList(tp1)); - + subscriptions.assignFromSubscribed(List.of(tp1)); // Now we will be sending the request to node1 only as leader for tip1 is node1. // We do not build the request for tip0 as there are no acknowledgements to send. @@ -1066,7 +1065,7 @@ public void testShareFetchAndCloseMultipleNodes() { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + subscriptions.subscribeToShareGroup(Set.of(topicName)); subscriptions.assignFromSubscribed(List.of(tp0, tp1)); client.updateMetadata( @@ -1076,17 +1075,15 @@ public void testShareFetchAndCloseMultipleNodes() { assertEquals(2, sendFetches()); assertFalse(shareConsumeRequestManager.hasCompletedFetches()); - client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE)); + client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, new LinkedHashMap<>(), List.of(), 0)); client.prepareResponse(fullFetchResponse(tip1, records, acquiredRecords, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); - Acknowledgements acknowledgements1 = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); Map acknowledgementsMap = new HashMap<>(); - acknowledgementsMap.put(tip0, new NodeAcknowledgements(0, acknowledgements)); - acknowledgementsMap.put(tip1, new NodeAcknowledgements(1, acknowledgements1)); + acknowledgementsMap.put(tip1, new NodeAcknowledgements(1, acknowledgements)); shareConsumeRequestManager.acknowledgeOnClose(acknowledgementsMap, calculateDeadlineMs(time, 1000L)); assertEquals(2, shareConsumeRequestManager.sendAcknowledgements()); @@ -1095,7 +1092,7 @@ public void testShareFetchAndCloseMultipleNodes() { client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertEquals(3, completedAcknowledgements.get(0).get(tip0).size()); + assertNull(completedAcknowledgements.get(0).get(tip0)); assertEquals(3, completedAcknowledgements.get(0).get(tip1).size()); assertEquals(0, shareConsumeRequestManager.sendAcknowledgements()); @@ -1107,7 +1104,7 @@ public void testShareFetchAndCloseMultipleNodes() { public void testRetryAcknowledgementsWithLeaderChange() { buildRequestManager(); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + subscriptions.subscribeToShareGroup(Set.of(topicName)); Set partitions = new HashSet<>(); partitions.add(tp0); subscriptions.assignFromSubscribed(partitions); @@ -1151,7 +1148,7 @@ public void testCallbackHandlerConfig() throws InterruptedException { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(Collections.singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); @@ -1207,7 +1204,7 @@ public void testAcknowledgementCommitCallbackMultiplePartitionCommitAsync() { LinkedHashMap partitionDataMap = new LinkedHashMap<>(); partitionDataMap.put(tip0, partitionDataForFetch(tip0, records, acquiredRecords, Errors.NONE, Errors.NONE)); partitionDataMap.put(t2ip0, partitionDataForFetch(t2ip0, records, acquiredRecords, Errors.NONE, Errors.NONE)); - client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, partitionDataMap, Collections.emptyList(), 0)); + client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, partitionDataMap, List.of(), 0)); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); @@ -1253,7 +1250,7 @@ public void testMultipleTopicsFetch() { LinkedHashMap partitionDataMap = new LinkedHashMap<>(); partitionDataMap.put(tip0, partitionDataForFetch(tip0, records, acquiredRecords, Errors.NONE, Errors.NONE)); partitionDataMap.put(t2ip0, partitionDataForFetch(t2ip0, records, emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.NONE)); - client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, partitionDataMap, Collections.emptyList(), 0)); + client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, partitionDataMap, List.of(), 0)); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); @@ -1282,7 +1279,7 @@ public void testMultipleTopicsFetchError() { LinkedHashMap partitionDataMap = new LinkedHashMap<>(); partitionDataMap.put(t2ip0, partitionDataForFetch(t2ip0, records, emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.NONE)); partitionDataMap.put(tip0, partitionDataForFetch(tip0, records, acquiredRecords, Errors.NONE, Errors.NONE)); - client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, partitionDataMap, Collections.emptyList(), 0)); + client.prepareResponse(ShareFetchResponse.of(Errors.NONE, 0, partitionDataMap, List.of(), 0)); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); @@ -1304,8 +1301,8 @@ public void testShareFetchInvalidResponse() { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); - subscriptions.assignFromSubscribed(Collections.singleton(tp0)); + subscriptions.subscribeToShareGroup(Set.of(topicName)); + subscriptions.assignFromSubscribed(Set.of(tp0)); client.updateMetadata( RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName, 1), @@ -1324,8 +1321,8 @@ public void testShareAcknowledgeInvalidResponse() throws InterruptedException { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); - subscriptions.assignFromSubscribed(Collections.singleton(tp0)); + subscriptions.subscribeToShareGroup(Set.of(topicName)); + subscriptions.assignFromSubscribed(Set.of(tp0)); client.updateMetadata( RequestTestUtils.metadataUpdateWithIds(1, Map.of(topicName, 1), @@ -1362,7 +1359,7 @@ public void testShareAcknowledgeInvalidResponse() throws InterruptedException { Acknowledgements acknowledgements1 = getAcknowledgements(2, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements1)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements1)), Map.of()); assertEquals(1, sendFetches()); assertFalse(shareConsumeRequestManager.hasCompletedFetches()); @@ -1390,7 +1387,7 @@ public void testCloseShouldBeIdempotent() { public void testFetchError() { buildRequestManager(); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, emptyAcquiredRecords, Errors.NOT_LEADER_OR_FOLLOWER); Map>> partitionRecords = fetchRecords(); @@ -1402,11 +1399,11 @@ public void testPiggybackAcknowledgementsOnInitialShareSessionError() { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Map.of()); NetworkClientDelegate.PollResult pollResult = shareConsumeRequestManager.sendFetchesReturnPollResult(); assertEquals(1, pollResult.unsentRequests.size()); @@ -1424,7 +1421,7 @@ public void testPiggybackAcknowledgementsOnInitialShareSessionErrorSubscriptionC buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); fetchRecords(); @@ -1437,12 +1434,12 @@ public void testPiggybackAcknowledgementsOnInitialShareSessionErrorSubscriptionC // Simulate a metadata update with no topics in the response. client.updateMetadata( - RequestTestUtils.metadataUpdateWithIds(1, Collections.emptyMap(), + RequestTestUtils.metadataUpdateWithIds(1, Map.of(), tp -> validLeaderEpoch, null, false)); // The acknowledgements for the initial fetch from tip0 are processed now and sent to the background thread. Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Map.of()); assertEquals(0, completedAcknowledgements.size()); @@ -1460,14 +1457,14 @@ public void testPiggybackAcknowledgementsOnInitialShareSession_ShareSessionNotFo buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); sendFetchAndVerifyResponse(records, acquiredRecords, Errors.NONE); fetchRecords(); // The acknowledgements for the initial fetch from tip0 are processed now and sent to the background thread. Acknowledgements acknowledgements = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT); - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Map.of()); // We attempt to send the acknowledgements piggybacking on the fetch. assertEquals(1, sendFetches()); @@ -1509,7 +1506,7 @@ public void testInvalidDefaultRecordBatch() { buffer.put("beef".getBytes()); buffer.position(0); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); // normal fetch assertEquals(1, sendFetches()); @@ -1540,7 +1537,7 @@ public void testParseInvalidRecordBatch() { // flip some bits to fail the crc buffer.putInt(32, buffer.get(32) ^ 87238423); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); // normal fetch assertEquals(1, sendFetches()); @@ -1572,7 +1569,7 @@ public void testHeaders() { MemoryRecords memoryRecords = builder.build(); List> records; - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); client.prepareResponse(fullFetchResponse(tip0, memoryRecords, @@ -1604,7 +1601,7 @@ record = recordIterator.next(); public void testUnauthorizedTopic() { buildRequestManager(); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); assertEquals(1, sendFetches()); client.prepareResponse(fullFetchResponse(tip0, records, emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED)); @@ -1613,14 +1610,14 @@ public void testUnauthorizedTopic() { collectFetch(); fail("collectFetch should have thrown a TopicAuthorizationException"); } catch (TopicAuthorizationException e) { - assertEquals(singleton(topicName), e.unauthorizedTopics()); + assertEquals(Set.of(topicName), e.unauthorizedTopics()); } } @Test public void testUnknownTopicIdError() { buildRequestManager(); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); assertEquals(1, sendFetches()); client.prepareResponse(fetchResponseWithTopLevelError(tip0, Errors.UNKNOWN_TOPIC_ID)); @@ -1635,7 +1632,7 @@ public void testHandleFetchResponseError(Errors error, boolean hasTopLevelError, boolean shouldRequestMetadataUpdate) { buildRequestManager(); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); assertEquals(1, sendFetches()); @@ -1677,7 +1674,7 @@ private static Stream handleFetchResponseErrorSupplier() { public void testFetchDisconnected() { buildRequestManager(); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); assertEquals(1, sendFetches()); client.prepareResponse(fullFetchResponse(tip0, records, acquiredRecords, Errors.NONE), true); @@ -1710,7 +1707,7 @@ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) { result.outputBuffer().flip(); MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.outputBuffer()); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); assertEquals(1, sendFetches()); client.prepareResponse(fullFetchResponse(tip0, compactedRecords, @@ -1739,7 +1736,7 @@ private MemoryRecords buildRecords(long baseOffset, int count, long firstMessage @Test public void testCorruptMessageError() { buildRequestManager(); - assignFromSubscribed(singleton(tp0)); + assignFromSubscribed(Set.of(tp0)); assertEquals(1, sendFetches()); assertFalse(shareConsumeRequestManager.hasCompletedFetches()); @@ -1766,7 +1763,7 @@ public void testCorruptMessageError() { public void testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeaderInformation(Errors error) { buildRequestManager(); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + subscriptions.subscribeToShareGroup(Set.of(topicName)); Set partitions = new HashSet<>(); partitions.add(tp0); partitions.add(tp1); @@ -1794,13 +1791,13 @@ public void testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeade .setRecords(records) .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId0); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); partitionData.clear(); partitionData.put(tip1, new ShareFetchResponseData.PartitionData() .setPartitionIndex(tip1.topicPartition().partition()) .setErrorCode(error.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId1); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId1); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); @@ -1813,7 +1810,7 @@ public void testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeade Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(1L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Map.of()); assertEquals(startingClusterMetadata, metadata.fetch()); @@ -1846,7 +1843,7 @@ public void testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeade .setRecords(records) .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId0); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); @@ -1869,7 +1866,7 @@ public void testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeade public void testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderInformation(Errors error) { buildRequestManager(); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + subscriptions.subscribeToShareGroup(Set.of(topicName)); Set partitions = new HashSet<>(); partitions.add(tp0); partitions.add(tp1); @@ -1896,7 +1893,7 @@ public void testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderIn .setRecords(records) .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId0); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); partitionData.clear(); partitionData.put(tip1, new ShareFetchResponseData.PartitionData() @@ -1905,7 +1902,7 @@ public void testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderIn .setCurrentLeader(new ShareFetchResponseData.LeaderIdAndEpoch() .setLeaderId(tp0Leader.id()) .setLeaderEpoch(validLeaderEpoch + 1))); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, singletonList(tp0Leader), 0), nodeId1); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(tp0Leader), 0), nodeId1); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); @@ -1918,7 +1915,7 @@ public void testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderIn Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(1L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Map.of()); // The metadata snapshot will have been updated with the new leader information assertNotEquals(startingClusterMetadata, metadata.fetch()); @@ -1944,7 +1941,7 @@ public void testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderIn .setRecords(records) .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId0); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); @@ -1968,7 +1965,7 @@ public void testWhenLeadershipChangeBetweenShareFetchRequests(Errors error) { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + subscriptions.subscribeToShareGroup(Set.of(topicName)); Set partitions = new HashSet<>(); partitions.add(tp0); partitions.add(tp1); @@ -1994,13 +1991,13 @@ public void testWhenLeadershipChangeBetweenShareFetchRequests(Errors error) { .setRecords(records) .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId0); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); partitionData.clear(); partitionData.put(tip1, new ShareFetchResponseData.PartitionData() .setPartitionIndex(tip1.topicPartition().partition()) .setErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId1); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId1); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); @@ -2013,7 +2010,7 @@ public void testWhenLeadershipChangeBetweenShareFetchRequests(Errors error) { Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(1L, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Map.of()); assertEquals(startingClusterMetadata, metadata.fetch()); @@ -2037,7 +2034,7 @@ public void testWhenLeadershipChangeBetweenShareFetchRequests(Errors error) { .setPartitionIndex(tip0.topicPartition().partition()) .setErrorCode(Errors.NONE.code()) .setAcknowledgeErrorCode(error.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId0); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); partitionData.clear(); partitionData.put(tip0, new ShareFetchResponseData.PartitionData() @@ -2052,7 +2049,7 @@ public void testWhenLeadershipChangeBetweenShareFetchRequests(Errors error) { .setRecords(records) .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId1); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId1); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); @@ -2071,7 +2068,7 @@ void testLeadershipChangeAfterFetchBeforeCommitAsync() { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + subscriptions.subscribeToShareGroup(Set.of(topicName)); Set partitions = new HashSet<>(); partitions.add(tp0); partitions.add(tp1); @@ -2093,11 +2090,8 @@ void testLeadershipChangeAfterFetchBeforeCommitAsync() { partitionData.put(tip0, new ShareFetchResponseData.PartitionData() .setPartitionIndex(tip0.topicPartition().partition()) - .setErrorCode(Errors.NONE.code()) - .setRecords(records) - .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) - .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId0); + .setErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); partitionData.clear(); partitionData.put(tip1, new ShareFetchResponseData.PartitionData() @@ -2106,52 +2100,44 @@ void testLeadershipChangeAfterFetchBeforeCommitAsync() { .setRecords(records) .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2)) .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId1); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId1); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); Map>> partitionRecords = fetchRecords(); - assertTrue(partitionRecords.containsKey(tp0)); + assertFalse(partitionRecords.containsKey(tp0)); assertTrue(partitionRecords.containsKey(tp1)); - List> fetchedRecords = partitionRecords.get(tp0); - assertEquals(1, fetchedRecords.size()); - - fetchedRecords = partitionRecords.get(tp1); + List> fetchedRecords = partitionRecords.get(tp1); assertEquals(2, fetchedRecords.size()); - Acknowledgements acknowledgementsTp0 = Acknowledgements.empty(); - acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT); - Acknowledgements acknowledgementsTp1 = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT); Map commitAcks = new HashMap<>(); - commitAcks.put(tip0, new NodeAcknowledgements(0, acknowledgementsTp0)); commitAcks.put(tip1, new NodeAcknowledgements(1, acknowledgementsTp1)); - // Move the leadership of tp0 onto node 1 + // Move the leadership of tp1 onto node 0 HashMap partitionLeaders = new HashMap<>(); - partitionLeaders.put(tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(validLeaderEpoch + 1))); + partitionLeaders.put(tp1, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId0.id()), Optional.of(validLeaderEpoch + 1))); metadata.updatePartitionLeadership(partitionLeaders, List.of()); assertNotEquals(startingClusterMetadata, metadata.fetch()); - // We fail the acknowledgements for records which were received from node0 with NOT_LEADER_OR_FOLLOWER exception. + // We fail the acknowledgements for records which were received from node1 with NOT_LEADER_OR_FOLLOWER exception. shareConsumeRequestManager.commitAsync(commitAcks, calculateDeadlineMs(time.timer(defaultApiTimeoutMs))); assertEquals(1, completedAcknowledgements.get(0).size()); - assertEquals(acknowledgementsTp0, completedAcknowledgements.get(0).get(tip0)); - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + assertEquals(acknowledgementsTp1, completedAcknowledgements.get(0).get(tip1)); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip1).getAcknowledgeException()); + completedAcknowledgements.clear(); - // We only send acknowledgements for tip1 to node1. - assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); + // There are no acknowledgements to send. + assertEquals(0, shareConsumeRequestManager.sendAcknowledgements()); client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertEquals(1, completedAcknowledgements.get(1).size()); - assertEquals(acknowledgementsTp1, completedAcknowledgements.get(1).get(tip1)); - assertNull(completedAcknowledgements.get(1).get(tip1).getAcknowledgeException()); + assertTrue(completedAcknowledgements.isEmpty()); } @Test @@ -2159,7 +2145,7 @@ void testLeadershipChangeAfterFetchBeforeCommitSync() { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + subscriptions.subscribeToShareGroup(Set.of(topicName)); subscriptions.assignFromSubscribed(List.of(tp0, tp1)); client.updateMetadata( @@ -2178,11 +2164,8 @@ void testLeadershipChangeAfterFetchBeforeCommitSync() { partitionData.put(tip0, new ShareFetchResponseData.PartitionData() .setPartitionIndex(tip0.topicPartition().partition()) - .setErrorCode(Errors.NONE.code()) - .setRecords(records) - .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) - .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId0); + .setErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); partitionData.clear(); partitionData.put(tip1, new ShareFetchResponseData.PartitionData() @@ -2191,33 +2174,26 @@ void testLeadershipChangeAfterFetchBeforeCommitSync() { .setRecords(records) .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2)) .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId1); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId1); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); Map>> partitionRecords = fetchRecords(); - assertTrue(partitionRecords.containsKey(tp0)); + assertFalse(partitionRecords.containsKey(tp0)); assertTrue(partitionRecords.containsKey(tp1)); - List> fetchedRecords = partitionRecords.get(tp0); - assertEquals(1, fetchedRecords.size()); - - fetchedRecords = partitionRecords.get(tp1); + List> fetchedRecords = partitionRecords.get(tp1); assertEquals(2, fetchedRecords.size()); - Acknowledgements acknowledgementsTp0 = Acknowledgements.empty(); - acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT); - Acknowledgements acknowledgementsTp1 = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT); Map commitAcks = new HashMap<>(); - commitAcks.put(tip0, new NodeAcknowledgements(0, acknowledgementsTp0)); commitAcks.put(tip1, new NodeAcknowledgements(1, acknowledgementsTp1)); - // Move the leadership of tp0 onto node 1 + // Move the leadership of tp1 onto node 0 HashMap partitionLeaders = new HashMap<>(); - partitionLeaders.put(tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(validLeaderEpoch + 1))); + partitionLeaders.put(tp1, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId0.id()), Optional.of(validLeaderEpoch + 1))); metadata.updatePartitionLeadership(partitionLeaders, List.of()); assertNotEquals(startingClusterMetadata, metadata.fetch()); @@ -2227,18 +2203,17 @@ void testLeadershipChangeAfterFetchBeforeCommitSync() { // Verify if the callback was invoked with the failed acknowledgements. assertEquals(1, completedAcknowledgements.get(0).size()); - assertEquals(acknowledgementsTp0, completedAcknowledgements.get(0).get(tip0)); - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + assertEquals(acknowledgementsTp1, completedAcknowledgements.get(0).get(tip1)); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip1).getAcknowledgeException()); + completedAcknowledgements.clear(); - // We only send acknowledgements for tip1 to node1. - assertEquals(1, shareConsumeRequestManager.sendAcknowledgements()); + // There are no acknowledgements to send. + assertEquals(0, shareConsumeRequestManager.sendAcknowledgements()); client.prepareResponse(fullAcknowledgeResponse(tip1, Errors.NONE)); networkClientDelegate.poll(time.timer(0)); - assertEquals(1, completedAcknowledgements.get(1).size()); - assertEquals(acknowledgementsTp1, completedAcknowledgements.get(1).get(tip1)); - assertNull(completedAcknowledgements.get(1).get(tip1).getAcknowledgeException()); + assertTrue(completedAcknowledgements.isEmpty()); } @Test @@ -2246,10 +2221,8 @@ void testLeadershipChangeAfterFetchBeforeClose() { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); - Set partitions = new HashSet<>(); - partitions.add(tp0); - partitions.add(tp1); + subscriptions.subscribeToShareGroup(Set.of(topicName)); + Set partitions = Set.of(tp0, tp1); subscriptions.assignFromSubscribed(partitions); client.updateMetadata( @@ -2268,11 +2241,8 @@ void testLeadershipChangeAfterFetchBeforeClose() { partitionData.put(tip0, new ShareFetchResponseData.PartitionData() .setPartitionIndex(tip0.topicPartition().partition()) - .setErrorCode(Errors.NONE.code()) - .setRecords(records) - .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) - .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId0); + .setErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); partitionData.clear(); partitionData.put(tip1, new ShareFetchResponseData.PartitionData() @@ -2281,57 +2251,47 @@ void testLeadershipChangeAfterFetchBeforeClose() { .setRecords(records) .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2)) .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId1); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId1); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); Map>> partitionRecords = fetchRecords(); - assertTrue(partitionRecords.containsKey(tp0)); + assertFalse(partitionRecords.containsKey(tp0)); assertTrue(partitionRecords.containsKey(tp1)); - List> fetchedRecords = partitionRecords.get(tp0); - assertEquals(1, fetchedRecords.size()); - - fetchedRecords = partitionRecords.get(tp1); + List> fetchedRecords = partitionRecords.get(tp1); assertEquals(2, fetchedRecords.size()); - Acknowledgements acknowledgementsTp0 = Acknowledgements.empty(); - acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT); - Acknowledgements acknowledgementsTp1 = getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.fetch(Map.of(tip1, new NodeAcknowledgements(1, acknowledgementsTp1)), Collections.emptyMap()); - - // Move the leadership of tp0 onto node 1 + // Move the leadership of tp1 onto node 0 HashMap partitionLeaders = new HashMap<>(); - partitionLeaders.put(tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(validLeaderEpoch + 1))); + partitionLeaders.put(tp1, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId0.id()), Optional.of(validLeaderEpoch + 1))); metadata.updatePartitionLeadership(partitionLeaders, List.of()); assertNotEquals(startingClusterMetadata, metadata.fetch()); - // We fail the acknowledgements for records which were received from node0 with NOT_LEADER_OR_FOLLOWER exception. - shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip0, new NodeAcknowledgements(0, acknowledgementsTp0)), + // We fail the acknowledgements for records which were received from node1 with NOT_LEADER_OR_FOLLOWER exception. + shareConsumeRequestManager.acknowledgeOnClose(Map.of(tip1, new NodeAcknowledgements(1, acknowledgementsTp1)), calculateDeadlineMs(time.timer(100))); // Verify if the callback was invoked with the failed acknowledgements. assertEquals(1, completedAcknowledgements.get(0).size()); - assertEquals(acknowledgementsTp0.getAcknowledgementsTypeMap(), completedAcknowledgements.get(0).get(tip0).getAcknowledgementsTypeMap()); - assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + assertEquals(acknowledgementsTp1.getAcknowledgementsTypeMap(), completedAcknowledgements.get(0).get(tip1).getAcknowledgementsTypeMap()); + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), completedAcknowledgements.get(0).get(tip1).getAcknowledgeException()); completedAcknowledgements.clear(); - // As we are closing, we still send the request to both the nodes, but with empty acknowledgements to node0, as it is no longer the leader. + // As we are closing, we still send the request to both the nodes, but with empty acknowledgements to node1, as it is no longer the leader. assertEquals(2, shareConsumeRequestManager.sendAcknowledgements()); - client.prepareResponseFrom(fullAcknowledgeResponse(tip1, Errors.NONE), nodeId1); + client.prepareResponseFrom(fullAcknowledgeResponse(tip0, Errors.NONE), nodeId0); networkClientDelegate.poll(time.timer(0)); - client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId0); + client.prepareResponseFrom(emptyAcknowledgeResponse(), nodeId1); networkClientDelegate.poll(time.timer(0)); - assertEquals(1, completedAcknowledgements.get(0).size()); - assertEquals(acknowledgementsTp1, completedAcknowledgements.get(0).get(tip1)); - assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException()); + assertTrue(completedAcknowledgements.isEmpty()); } @Test @@ -2339,12 +2299,13 @@ void testWhenLeadershipChangedAfterDisconnected() { buildRequestManager(); shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true); - subscriptions.subscribeToShareGroup(Collections.singleton(topicName)); + subscriptions.subscribeToShareGroup(Set.of(topicName)); Set partitions = new HashSet<>(); partitions.add(tp0); partitions.add(tp1); subscriptions.assignFromSubscribed(partitions); + // This spreads the leadership of the partitions across the available nodes client.updateMetadata( RequestTestUtils.metadataUpdateWithIds(2, Map.of(topicName, 2), tp -> validLeaderEpoch, topicIds, false)); @@ -2354,6 +2315,9 @@ void testWhenLeadershipChangedAfterDisconnected() { Cluster startingClusterMetadata = metadata.fetch(); assertFalse(metadata.updateRequested()); + // The first poll sends ShareFetch to both nodes + // - node 0 - establishing the share session, but not fetching records + // - node 1 - fetching records from tp1 assertEquals(2, sendFetches()); assertFalse(shareConsumeRequestManager.hasCompletedFetches()); @@ -2361,37 +2325,35 @@ void testWhenLeadershipChangedAfterDisconnected() { partitionData.put(tip0, new ShareFetchResponseData.PartitionData() .setPartitionIndex(tip0.topicPartition().partition()) - .setErrorCode(Errors.NONE.code()) - .setRecords(records) - .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) - .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId0); + .setErrorCode(Errors.NONE.code())); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); partitionData.clear(); partitionData.put(tip1, new ShareFetchResponseData.PartitionData() .setPartitionIndex(tip1.topicPartition().partition()) - .setErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId1); + .setErrorCode(Errors.NONE.code()) + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId1); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); Map>> partitionRecords = fetchRecords(); - assertTrue(partitionRecords.containsKey(tp0)); - assertFalse(partitionRecords.containsKey(tp1)); + assertFalse(partitionRecords.containsKey(tp0)); + assertTrue(partitionRecords.containsKey(tp1)); - List> fetchedRecords = partitionRecords.get(tp0); + List> fetchedRecords = partitionRecords.get(tp1); assertEquals(1, fetchedRecords.size()); Acknowledgements acknowledgements = Acknowledgements.empty(); acknowledgements.add(1, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); + shareConsumeRequestManager.fetch(Map.of(tip1, new NodeAcknowledgements(1, acknowledgements)), Map.of()); assertEquals(startingClusterMetadata, metadata.fetch()); - acknowledgements = Acknowledgements.empty(); - acknowledgements.add(1, AcknowledgeType.ACCEPT); - shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap()); - + // The second poll sends ShareFetch to both nodes + // - node 0 - fetching records from tp0 + // - node 1 - acknowledges records from tp1, but not fetching records assertEquals(2, sendFetches()); assertFalse(shareConsumeRequestManager.hasCompletedFetches()); @@ -2400,39 +2362,38 @@ void testWhenLeadershipChangedAfterDisconnected() { new ShareFetchResponseData.PartitionData() .setPartitionIndex(tip0.topicPartition().partition()) .setErrorCode(Errors.NONE.code()) - .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId0, true); + .setRecords(records) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1))); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); partitionData.clear(); partitionData.put(tip1, new ShareFetchResponseData.PartitionData() .setPartitionIndex(tip1.topicPartition().partition()) - .setRecords(records) - .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)) + .setErrorCode(Errors.NONE.code()) .setAcknowledgeErrorCode(Errors.NONE.code())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId1); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId1, true); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); // The node was disconnected, so the acknowledgement failed - assertInstanceOf(DisconnectException.class, completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); + assertInstanceOf(DisconnectException.class, completedAcknowledgements.get(0).get(tip1).getAcknowledgeException()); completedAcknowledgements.clear(); partitionRecords = fetchRecords(); - assertFalse(partitionRecords.containsKey(tp0)); - assertTrue(partitionRecords.containsKey(tp1)); + assertTrue(partitionRecords.containsKey(tp0)); + assertFalse(partitionRecords.containsKey(tp1)); - fetchedRecords = partitionRecords.get(tp1); - assertEquals(1, fetchedRecords.size()); + shareConsumeRequestManager.fetch(Map.of(tip0, new NodeAcknowledgements(0, acknowledgements)), Map.of()); - // Move the leadership of tp0 onto node 1 + // Move the leadership of tp1 onto node 0 HashMap partitionLeaders = new HashMap<>(); - partitionLeaders.put(tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(validLeaderEpoch + 1))); + partitionLeaders.put(tp1, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId0.id()), Optional.of(validLeaderEpoch + 1))); metadata.updatePartitionLeadership(partitionLeaders, List.of()); assertNotEquals(startingClusterMetadata, metadata.fetch()); - shareConsumeRequestManager.fetch(Map.of(tip1, new NodeAcknowledgements(1, acknowledgements)), Collections.emptyMap()); - + // The third poll sends ShareFetch to one node + // - node 0 - acknowledging records from tp0, and fetching records from tp0 and tp1 assertEquals(1, sendFetches()); assertFalse(shareConsumeRequestManager.hasCompletedFetches()); @@ -2446,19 +2407,23 @@ void testWhenLeadershipChangedAfterDisconnected() { .setAcknowledgeErrorCode(Errors.NONE.code())); partitionData.put(tip1, new ShareFetchResponseData.PartitionData() - .setPartitionIndex(tip1.topicPartition().partition())); - client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, Collections.emptyList(), 0), nodeId1); + .setPartitionIndex(tip1.topicPartition().partition()) + .setRecords(moreRecords) + .setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(4L, 1))); + client.prepareResponseFrom(ShareFetchResponse.of(Errors.NONE, 0, partitionData, List.of(), 0), nodeId0); networkClientDelegate.poll(time.timer(0)); assertTrue(shareConsumeRequestManager.hasCompletedFetches()); - assertNull(completedAcknowledgements.get(0).get(tip1).getAcknowledgeException()); + assertNull(completedAcknowledgements.get(0).get(tip0).getAcknowledgeException()); partitionRecords = fetchRecords(); assertTrue(partitionRecords.containsKey(tp0)); - assertFalse(partitionRecords.containsKey(tp1)); + assertTrue(partitionRecords.containsKey(tp1)); fetchedRecords = partitionRecords.get(tp0); assertEquals(1, fetchedRecords.size()); + fetchedRecords = partitionRecords.get(tp1); + assertEquals(1, fetchedRecords.size()); } @Test @@ -2496,7 +2461,7 @@ private ShareFetchResponse fetchResponseWithTopLevelError(TopicIdPartition tp, E new ShareFetchResponseData.PartitionData() .setPartitionIndex(tp.topicPartition().partition()) .setErrorCode(error.code())); - return ShareFetchResponse.of(error, 0, new LinkedHashMap<>(partitions), Collections.emptyList(), 0); + return ShareFetchResponse.of(error, 0, new LinkedHashMap<>(partitions), List.of(), 0); } private ShareFetchResponse fullFetchResponse(TopicIdPartition tp, @@ -2513,30 +2478,30 @@ private ShareFetchResponse fullFetchResponse(TopicIdPartition tp, Errors acknowledgeError) { Map partitions = Map.of(tp, partitionDataForFetch(tp, records, acquiredRecords, error, acknowledgeError)); - return ShareFetchResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList(), 0); + return ShareFetchResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), List.of(), 0); } private ShareAcknowledgeResponse emptyAcknowledgeResponse() { - Map partitions = Collections.emptyMap(); - return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList()); + Map partitions = Map.of(); + return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), List.of()); } private ShareAcknowledgeResponse acknowledgeResponseWithTopLevelError(TopicIdPartition tp, Errors error) { Map partitions = Map.of(tp, partitionDataForAcknowledge(tp, Errors.NONE)); - return ShareAcknowledgeResponse.of(error, 0, new LinkedHashMap<>(partitions), Collections.emptyList()); + return ShareAcknowledgeResponse.of(error, 0, new LinkedHashMap<>(partitions), List.of()); } private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition tp, Errors error) { Map partitions = Map.of(tp, partitionDataForAcknowledge(tp, error)); - return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList()); + return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), List.of()); } private ShareAcknowledgeResponse fullAcknowledgeResponse(Map partitionErrorsMap) { Map partitions = new HashMap<>(); partitionErrorsMap.forEach((tip, error) -> partitions.put(tip, partitionDataForAcknowledge(tip, error))); - return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), Collections.emptyList()); + return ShareAcknowledgeResponse.of(Errors.NONE, 0, new LinkedHashMap<>(partitions), List.of()); } private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition tp, @@ -2584,7 +2549,7 @@ private ShareAcknowledgeResponseData.PartitionData partitionDataForAcknowledge(T */ private void assertEmptyFetch(String reason) { ShareFetch fetch = collectFetch(); - assertEquals(Collections.emptyMap(), fetch.records(), reason); + assertEquals(Map.of(), fetch.records(), reason); assertTrue(fetch.isEmpty(), reason); } @@ -2600,7 +2565,7 @@ private Acknowledgements getAcknowledgements(int startIndex, AcknowledgeType... private Map>> fetchRecords() { ShareFetch fetch = collectFetch(); if (fetch.isEmpty()) { - return Collections.emptyMap(); + return Map.of(); } return fetch.records(); } diff --git a/core/src/main/java/kafka/server/share/SharePartitionManager.java b/core/src/main/java/kafka/server/share/SharePartitionManager.java index 3c9c727c53a64..b373ca159715f 100644 --- a/core/src/main/java/kafka/server/share/SharePartitionManager.java +++ b/core/src/main/java/kafka/server/share/SharePartitionManager.java @@ -620,8 +620,8 @@ private static String partitionsToLogString(Collection partiti // Visible for testing. void processShareFetch(ShareFetch shareFetch) { - if (shareFetch.topicIdPartitions().isEmpty()) { - // If there are no partitions to fetch then complete the future with an empty map. + if (shareFetch.topicIdPartitions().isEmpty() || shareFetch.maxFetchRecords() == 0 || shareFetch.fetchParams().maxBytes == 0) { + // If there are no partitions or no data requested to fetch then complete the future with an empty map. shareFetch.maybeComplete(Map.of()); return; }