Skip to content

Commit c58cf1d

Browse files
authored
KAFKA-19763: Parallel remote reads cause memory leak in broker (#20654)
Broker heap memory gets filled up and throws OOM error when remote reads are triggered for multiple partitions within a FETCH request. Steps to reproduce: 1. Start a one node broker and configure LocalTieredStorage as remote storage. 2. Create a topic with 5 partitions. 3. Produce message and ensure that few segments are uploaded to remote. 4. Start a consumer to read from those 5 partitions. Seek the offset to beginning for 4 partitions and to end for 1 partition. This is to simulate that the FETCH request read from both remote-log and local-log. 5. The broker crashes with the OOM error. 6. The DelayedRemoteFetch / RemoteLogReadResult references are being held by the purgatory, so the broker crashes. Reviewers: Luke Chen <[email protected]>, Satish Duggana <[email protected]>
1 parent 616b023 commit c58cf1d

File tree

2 files changed

+184
-51
lines changed

2 files changed

+184
-51
lines changed

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -251,9 +251,12 @@ class ReplicaManager(val config: KafkaConfig,
251251
new DelayedOperationPurgatory[DelayedDeleteRecords](
252252
"DeleteRecords", config.brokerId,
253253
config.deleteRecordsPurgatoryPurgeIntervalRequests))
254+
// delayedRemoteFetchPurgatory purgeInterval is set to 0 to release the references of completed DelayedRemoteFetch
255+
// instances immediately for GC. The DelayedRemoteFetch instance internally holds the RemoteLogReadResult that can be
256+
// up to the size of `fetch.max.bytes` which defaults to 50 MB.
254257
val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse(
255258
new DelayedOperationPurgatory[DelayedRemoteFetch](
256-
"RemoteFetch", config.brokerId))
259+
"RemoteFetch", config.brokerId, 0))
257260
val delayedRemoteListOffsetsPurgatory = delayedRemoteListOffsetsPurgatoryParam.getOrElse(
258261
new DelayedOperationPurgatory[DelayedRemoteListOffsets](
259262
"RemoteListOffsets", config.brokerId))
@@ -1637,7 +1640,7 @@ class ReplicaManager(val config: KafkaConfig,
16371640
params: FetchParams,
16381641
responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
16391642
logReadResults: Seq[(TopicIdPartition, LogReadResult)],
1640-
remoteFetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
1643+
fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Unit = {
16411644
val remoteFetchTasks = new util.HashMap[TopicIdPartition, Future[Void]]
16421645
val remoteFetchResults = new util.HashMap[TopicIdPartition, CompletableFuture[RemoteLogReadResult]]
16431646

@@ -1649,10 +1652,10 @@ class ReplicaManager(val config: KafkaConfig,
16491652

16501653
val remoteFetchMaxWaitMs = config.remoteLogManagerConfig.remoteFetchMaxWaitMs().toLong
16511654
val remoteFetch = new DelayedRemoteFetch(remoteFetchTasks, remoteFetchResults, remoteFetchInfos, remoteFetchMaxWaitMs,
1652-
remoteFetchPartitionStatus, params, logReadResults, this, responseCallback)
1655+
fetchPartitionStatus, params, logReadResults, this, responseCallback)
16531656

16541657
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
1655-
val delayedFetchKeys = remoteFetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
1658+
val delayedFetchKeys = remoteFetchTasks.asScala.map { case (tp, _) => new TopicPartitionOperationKey(tp) }.toList
16561659
delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, delayedFetchKeys.asJava)
16571660
}
16581661

@@ -1737,6 +1740,8 @@ class ReplicaManager(val config: KafkaConfig,
17371740
// try to complete the request immediately, otherwise put it into the purgatory;
17381741
// this is because while the delayed fetch operation is being created, new requests
17391742
// may arrive and hence make this operation completable.
1743+
// We only guarantee eventual cleanup via the next FETCH request for the same set of partitions or
1744+
// using reaper-thread.
17401745
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys.asJava)
17411746
}
17421747
}
@@ -1926,11 +1931,14 @@ class ReplicaManager(val config: KafkaConfig,
19261931
Optional.empty()
19271932
)
19281933
} else {
1929-
// For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information.
1930-
// For the topic-partitions that need remote data, we will use this information to read the data in another thread.
1931-
new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(),
1932-
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp,
1933-
fetchInfo, params.isolation)))
1934+
val remoteStorageFetchInfoOpt = if (adjustedMaxBytes > 0) {
1935+
// For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information.
1936+
// For the topic-partitions that need remote data, we will use this information to read the data in another thread.
1937+
Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp, fetchInfo, params.isolation))
1938+
} else {
1939+
Optional.empty[RemoteStorageFetchInfo]()
1940+
}
1941+
new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(), remoteStorageFetchInfoOpt)
19341942
}
19351943

19361944
new LogReadResult(fetchDataInfo,

0 commit comments

Comments
 (0)