Skip to content

Commit 76cbe5f

Browse files
[-] test + fix
1 parent 1503743 commit 76cbe5f

File tree

9 files changed

+111
-3
lines changed

9 files changed

+111
-3
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ struct TEvPQ {
198198
EvRunCompaction,
199199
EvMirrorTopicDescription,
200200
EvBroadcastPartitionError,
201+
EvForceCompaction,
201202
EvEnd
202203
};
203204

@@ -1278,6 +1279,15 @@ struct TEvPQ {
12781279

12791280
ui64 BlobsCount = 0;
12801281
};
1282+
1283+
struct TEvForceCompaction : TEventLocal<TEvForceCompaction, EvForceCompaction> {
1284+
explicit TEvForceCompaction(const ui32 partitionId) :
1285+
PartitionId(partitionId)
1286+
{
1287+
}
1288+
1289+
ui32 PartitionId = 0;
1290+
};
12811291
};
12821292

12831293
} //NKikimr

ydb/core/persqueue/pqtablet/partition/partition.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ class TPartition : public TBaseActor<TPartition> {
246246
void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
247247
void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx);
248248
void Handle(TEvPQ::TEvRunCompaction::TPtr& ev);
249+
void Handle(TEvPQ::TEvForceCompaction::TPtr& ev);
249250
void Handle(TEvPQ::TEvExclusiveLockAcquired::TPtr& ev);
250251
void Handle(TEvPQ::TBroadcastPartitionError::TPtr& ev, const TActorContext& ctx);
251252
void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx);
@@ -596,6 +597,7 @@ class TPartition : public TBaseActor<TPartition> {
596597
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
597598
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
598599
hFuncTraced(TEvPQ::TEvRunCompaction, Handle);
600+
hFuncTraced(TEvPQ::TEvForceCompaction, Handle);
599601
default:
600602
if (!Initializer.Handle(ev)) {
601603
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
@@ -664,6 +666,7 @@ class TPartition : public TBaseActor<TPartition> {
664666
HFuncTraced(TEvPQ::TEvDeletePartition, Handle);
665667
IgnoreFunc(TEvPQ::TEvTxBatchComplete);
666668
hFuncTraced(TEvPQ::TEvRunCompaction, Handle);
669+
hFuncTraced(TEvPQ::TEvForceCompaction, Handle);
667670
default:
668671
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
669672
break;
@@ -1116,7 +1119,7 @@ class TPartition : public TBaseActor<TPartition> {
11161119
const TEvPQ::TEvBlobResponse* blobResponse,
11171120
const TActorContext& ctx);
11181121

1119-
void TryRunCompaction();
1122+
void TryRunCompaction(bool force = false);
11201123
void BlobsForCompactionWereRead(const TVector<NPQ::TRequestedBlob>& blobs);
11211124
void BlobsForCompactionWereWrite();
11221125
ui64 NextReadCookie();

ydb/core/persqueue/pqtablet/partition/partition_compaction.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ void TPartition::DumpKeysForBlobsCompaction() const
164164
LOG_D("===================================");
165165
}
166166

167-
void TPartition::TryRunCompaction()
167+
void TPartition::TryRunCompaction(bool force)
168168
{
169169
if (StopCompaction) {
170170
LOG_D("Blobs compaction is stopped");
@@ -186,7 +186,7 @@ void TPartition::TryRunCompaction()
186186
const ui64 blobsKeyCountLimit = GetBodyKeysCountLimit();
187187
const ui64 compactedBlobSizeLowerBound = GetCompactedBlobSizeLowerBound();
188188

189-
if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit())) {
189+
if ((BlobEncoder.DataKeysBody.size() < blobsKeyCountLimit) && (BlobEncoder.GetSize() < GetCumulativeSizeLimit()) && !force) {
190190
LOG_D("No data for blobs compaction");
191191
return;
192192
}
@@ -207,13 +207,19 @@ void TPartition::TryRunCompaction()
207207
LOG_D("Blob key for rename " << k.Key.ToString());
208208
}
209209
}
210+
210211
LOG_D(blobsCount << " keys were taken away. Let's read " << blobsSize << " bytes");
211212

212213
CompactionInProgress = true;
213214

214215
Send(SelfId(), new TEvPQ::TEvRunCompaction(blobsCount));
215216
}
216217

218+
void TPartition::Handle(TEvPQ::TEvForceCompaction::TPtr&)
219+
{
220+
TryRunCompaction(true);
221+
}
222+
217223
void TPartition::Handle(TEvPQ::TEvRunCompaction::TPtr& ev)
218224
{
219225
const ui64 blobsCount = ev->Get()->BlobsCount;

ydb/core/persqueue/pqtablet/partition/partition_init.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,8 @@ void TInitDataRangeStep::FormHeadAndProceed() {
778778

779779
cz.Head.Offset = headKey.GetOffset();
780780
cz.Head.PartNo = headKey.GetPartNo();
781+
782+
Partition()->WasTheLastBlobBig = false;
781783
}
782784

783785
// FastWrite Body

ydb/core/persqueue/pqtablet/pq_impl.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5238,6 +5238,23 @@ void TPersQueue::ProcessPendingEvents()
52385238
}
52395239
}
52405240

5241+
void TPersQueue::Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx)
5242+
{
5243+
PQ_LOG_D("TPersQueue::Handle(TEvPQ::TEvForceCompaction)");
5244+
5245+
const auto& event = *ev->Get();
5246+
const TPartitionId partitionId(event.PartitionId);
5247+
5248+
if (!Partitions.contains(partitionId)) {
5249+
PQ_LOG_D("Unknown partition id " << event.PartitionId);
5250+
return;
5251+
}
5252+
5253+
auto p = Partitions.find(partitionId);
5254+
ctx.Send(p->second.Actor,
5255+
new TEvPQ::TEvForceCompaction(event.PartitionId));
5256+
}
5257+
52415258
bool TPersQueue::HandleHook(STFUNC_SIG)
52425259
{
52435260
TRACE_EVENT(NKikimrServices::PERSQUEUE);
@@ -5285,6 +5302,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
52855302
HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle);
52865303
HFuncTraced(TEvPQ::TEvDeletePartitionDone, Handle);
52875304
HFuncTraced(TEvPQ::TEvTransactionCompleted, Handle);
5305+
HFuncTraced(TEvPQ::TEvForceCompaction, Handle);
52885306
default:
52895307
return false;
52905308
}

ydb/core/persqueue/pqtablet/pq_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,6 +603,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
603603

604604
void ResendSplitMergeRequests(const TActorContext& ctx);
605605

606+
void Handle(TEvPQ::TEvForceCompaction::TPtr& ev, const TActorContext& ctx);
606607

607608
TIntrusivePtr<NJaegerTracing::TSamplingThrottlingControl> SamplingControl;
608609
NWilson::TSpan WriteTxsSpan;

ydb/core/persqueue/ut/common/pq_ut_common.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1250,4 +1250,19 @@ THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(T
12501250
return runtime.GrabEdgeEvent<TEvPersQueue::TEvPeriodicTopicStats>(TDuration::Seconds(2));
12511251
}
12521252

1253+
void CmdRunCompaction(TTestActorRuntime& runtime,
1254+
ui64 tabletId,
1255+
const TActorId& sender,
1256+
const ui32 partition)
1257+
{
1258+
auto event = MakeHolder<TEvPQ::TEvForceCompaction>(partition);
1259+
runtime.SendToPipe(tabletId, sender, event.Release(), 0, GetPipeConfigWithRetries());
1260+
}
1261+
1262+
void CmdRunCompaction(const ui32 partition,
1263+
TTestContext& tc)
1264+
{
1265+
CmdRunCompaction(*tc.Runtime, tc.TabletId, tc.Edge, partition);
1266+
}
1267+
12531268
} // namespace NKikimr::NPQ

ydb/core/persqueue/ut/common/pq_ut_common.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -644,6 +644,13 @@ struct TCmdWriteOptions {
644644
};
645645
void CmdWrite(const TCmdWriteOptions&);
646646

647+
void CmdRunCompaction(TTestActorRuntime& runtime,
648+
ui64 tabletId,
649+
const TActorId& sender,
650+
const ui32 partition);
651+
void CmdRunCompaction(const ui32 partition,
652+
TTestContext& tc);
653+
647654
THolder<TEvPersQueue::TEvPeriodicTopicStats> GetReadBalancerPeriodicTopicStats(TTestActorRuntime& runtime, ui64 balancerId);
648655

649656
} // namespace NKikimr::NPQ

ydb/core/persqueue/ut/pq_ut.cpp

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,52 @@ TMaybe<ui64> PQGetStartOffset(TTestContext& tc)
5252
return Nothing();
5353
}
5454

55+
Y_UNIT_TEST(TestCompaction) {
56+
TTestContext tc;
57+
tc.EnableDetailedPQLog = true;
58+
RunTestWithReboots(tc.TabletIds, [&]() {
59+
return tc.InitialEventsFilter.Prepare();
60+
}, [&](const TString& dispatchName, std::function<void(TTestActorRuntime&)> setup, bool& activeZone) {
61+
activeZone = false;
62+
TFinalizer finalizer(tc);
63+
tc.Prepare(dispatchName, setup, activeZone);
64+
activeZone = false;
65+
tc.Runtime->SetScheduledLimit(1000);
66+
67+
ui32 sourceIdx = 0;
68+
auto cmdWrite = [&](const TVector<size_t>& sizes) {
69+
TVector<std::pair<ui64, TString>> data;
70+
for (size_t k = 1; k <= sizes.size(); ++k) {
71+
data.emplace_back(k, TString(sizes[k - 1], 'x'));
72+
}
73+
TString sourceId = "sourceid_" + ToString(sourceIdx++);
74+
CmdWrite(0, sourceId, data, tc, false, {}, false, "", -1, -1, false, false, true);
75+
};
76+
auto cmdCompaction = [&]() {
77+
CmdRunCompaction(0, tc);
78+
};
79+
80+
PQTabletPrepare({.partitions = 1, .writeSpeed = 50_MB}, {{"user1", true}}, tc);
81+
82+
cmdWrite({17400_KB});
83+
cmdCompaction();
84+
85+
cmdWrite({16800_KB});
86+
cmdCompaction();
87+
88+
PQTabletRestart(tc);
89+
90+
cmdWrite({7000_KB, 13300_KB});
91+
cmdCompaction();
92+
93+
cmdWrite({1_KB});
94+
95+
PQTabletRestart(tc);
96+
97+
PQGetPartInfo(0, 4 + 1, tc);
98+
});
99+
}
100+
55101
Y_UNIT_TEST(TestCmdReadWithLastOffset) {
56102
TTestContext tc;
57103
tc.EnableDetailedPQLog = true;

0 commit comments

Comments
 (0)