Skip to content

Commit 9148e14

Browse files
committed
[FLINK-38695][table-runtime] Fix wrong metric about left cache request count in DeltaJoinCache
1 parent bdbc8b5 commit 9148e14

File tree

3 files changed

+310
-24
lines changed

3 files changed

+310
-24
lines changed

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/DeltaJoinCache.java

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,25 @@
4141
* <p>Note: This cache is not thread-safe although its inner {@link Cache} is thread-safe.
4242
*/
4343
@NotThreadSafe
44+
@VisibleForTesting
4445
public class DeltaJoinCache {
4546

46-
private static final String LEFT_CACHE_METRIC_PREFIX = "deltaJoin.leftCache.";
47-
private static final String RIGHT_CACHE_METRIC_PREFIX = "deltaJoin.rightCache.";
47+
protected static final String LEFT_CACHE_METRIC_PREFIX = "deltaJoin.leftCache.";
48+
protected static final String RIGHT_CACHE_METRIC_PREFIX = "deltaJoin.rightCache.";
4849

49-
private static final String METRIC_HIT_RATE = "hitRate";
50-
private static final String METRIC_REQUEST_COUNT = "requestCount";
51-
private static final String METRIC_HIT_COUNT = "hitCount";
52-
private static final String METRIC_KEY_SIZE = "keySize";
53-
private static final String METRIC_TOTAL_NON_EMPTY_VALUE_SIZE = "totalNonEmptyValues";
50+
protected static final String METRIC_HIT_RATE = "hitRate";
51+
protected static final String METRIC_REQUEST_COUNT = "requestCount";
52+
protected static final String METRIC_HIT_COUNT = "hitCount";
53+
protected static final String METRIC_KEY_SIZE = "keySize";
54+
protected static final String METRIC_TOTAL_NON_EMPTY_VALUE_SIZE = "totalNonEmptyValues";
5455

5556
// use LinkedHashMap to keep order
5657
private final Cache<RowData, LinkedHashMap<RowData, Object>> leftCache;
5758
private final Cache<RowData, LinkedHashMap<RowData, Object>> rightCache;
5859

5960
// metrics
60-
private final AtomicLong leftTotalSize = new AtomicLong(0L);
61-
private final AtomicLong rightTotalSize = new AtomicLong(0L);
61+
private final AtomicLong leftTotalNonEmptyValueSize = new AtomicLong(0L);
62+
private final AtomicLong rightTotalNonEmptyValueSize = new AtomicLong(0L);
6263
private final AtomicLong leftHitCount = new AtomicLong(0L);
6364
private final AtomicLong leftRequestCount = new AtomicLong(0L);
6465
private final AtomicLong rightHitCount = new AtomicLong(0L);
@@ -87,14 +88,15 @@ public void registerMetrics(MetricGroup metricGroup) {
8788
: Long.valueOf(leftHitCount.get()).doubleValue()
8889
/ leftRequestCount.get());
8990
metricGroup.<Long, Gauge<Long>>gauge(
90-
LEFT_CACHE_METRIC_PREFIX + METRIC_REQUEST_COUNT, rightRequestCount::get);
91+
LEFT_CACHE_METRIC_PREFIX + METRIC_REQUEST_COUNT, leftRequestCount::get);
9192
metricGroup.<Long, Gauge<Long>>gauge(
9293
LEFT_CACHE_METRIC_PREFIX + METRIC_HIT_COUNT, leftHitCount::get);
9394
metricGroup.<Long, Gauge<Long>>gauge(
9495
LEFT_CACHE_METRIC_PREFIX + METRIC_KEY_SIZE, leftCache::size);
9596

9697
metricGroup.<Long, Gauge<Long>>gauge(
97-
LEFT_CACHE_METRIC_PREFIX + METRIC_TOTAL_NON_EMPTY_VALUE_SIZE, leftTotalSize::get);
98+
LEFT_CACHE_METRIC_PREFIX + METRIC_TOTAL_NON_EMPTY_VALUE_SIZE,
99+
leftTotalNonEmptyValueSize::get);
98100

99101
// right cache metric
100102
metricGroup.<Double, Gauge<Double>>gauge(
@@ -111,7 +113,8 @@ public void registerMetrics(MetricGroup metricGroup) {
111113
metricGroup.<Long, Gauge<Long>>gauge(
112114
RIGHT_CACHE_METRIC_PREFIX + METRIC_KEY_SIZE, rightCache::size);
113115
metricGroup.<Long, Gauge<Long>>gauge(
114-
RIGHT_CACHE_METRIC_PREFIX + METRIC_TOTAL_NON_EMPTY_VALUE_SIZE, rightTotalSize::get);
116+
RIGHT_CACHE_METRIC_PREFIX + METRIC_TOTAL_NON_EMPTY_VALUE_SIZE,
117+
rightTotalNonEmptyValueSize::get);
115118
}
116119

117120
@Nullable
@@ -124,18 +127,18 @@ public void buildCache(
124127
Preconditions.checkState(getData(key, buildRightCache) == null);
125128
if (buildRightCache) {
126129
rightCache.put(key, ukDataMap);
127-
rightTotalSize.addAndGet(ukDataMap.size());
130+
rightTotalNonEmptyValueSize.addAndGet(ukDataMap.size());
128131
} else {
129132
leftCache.put(key, ukDataMap);
130-
leftTotalSize.addAndGet(ukDataMap.size());
133+
leftTotalNonEmptyValueSize.addAndGet(ukDataMap.size());
131134
}
132135
}
133136

134137
public void upsertCache(RowData key, RowData uk, Object data, boolean upsertRightCache) {
135138
if (upsertRightCache) {
136-
upsert(rightCache, key, uk, data, rightTotalSize);
139+
upsert(rightCache, key, uk, data, rightTotalNonEmptyValueSize);
137140
} else {
138-
upsert(leftCache, key, uk, data, leftTotalSize);
141+
upsert(leftCache, key, uk, data, leftTotalNonEmptyValueSize);
139142
}
140143
}
141144

@@ -190,9 +193,9 @@ public void onRemoval(
190193
}
191194

192195
if (isLeftCache) {
193-
leftTotalSize.addAndGet(-removalNotification.getValue().size());
196+
leftTotalNonEmptyValueSize.addAndGet(-removalNotification.getValue().size());
194197
} else {
195-
rightTotalSize.addAndGet(-removalNotification.getValue().size());
198+
rightTotalNonEmptyValueSize.addAndGet(-removalNotification.getValue().size());
196199
}
197200
}
198201
}
@@ -210,13 +213,13 @@ public Cache<RowData, LinkedHashMap<RowData, Object>> getRightCache() {
210213
}
211214

212215
@VisibleForTesting
213-
public AtomicLong getLeftTotalSize() {
214-
return leftTotalSize;
216+
public AtomicLong getLeftTotalNonEmptyValueSize() {
217+
return leftTotalNonEmptyValueSize;
215218
}
216219

217220
@VisibleForTesting
218-
public AtomicLong getRightTotalSize() {
219-
return rightTotalSize;
221+
public AtomicLong getRightTotalNonEmptyValueSize() {
222+
return rightTotalNonEmptyValueSize;
220223
}
221224

222225
@VisibleForTesting
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.operators.join.deltajoin;
20+
21+
import org.apache.flink.metrics.Gauge;
22+
import org.apache.flink.metrics.Metric;
23+
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
24+
import org.apache.flink.table.data.RowData;
25+
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
26+
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
27+
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
28+
import org.apache.flink.util.function.TriConsumer;
29+
30+
import org.apache.flink.shaded.guava33.com.google.common.collect.Maps;
31+
32+
import org.junit.jupiter.api.Assumptions;
33+
import org.junit.jupiter.api.BeforeEach;
34+
import org.junit.jupiter.api.TestTemplate;
35+
import org.junit.jupiter.api.extension.ExtendWith;
36+
37+
import java.util.Arrays;
38+
import java.util.HashMap;
39+
import java.util.LinkedHashMap;
40+
import java.util.List;
41+
import java.util.Map;
42+
import java.util.function.BiConsumer;
43+
44+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.LEFT_CACHE_METRIC_PREFIX;
45+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_HIT_COUNT;
46+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_HIT_RATE;
47+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_KEY_SIZE;
48+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_REQUEST_COUNT;
49+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_TOTAL_NON_EMPTY_VALUE_SIZE;
50+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.RIGHT_CACHE_METRIC_PREFIX;
51+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.row;
52+
import static org.assertj.core.api.Assertions.assertThat;
53+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
54+
55+
/** Test for {@link DeltaJoinCache}. */
56+
@ExtendWith(ParameterizedTestExtension.class)
57+
class DeltaJoinCacheTest {
58+
59+
private static final Long LEFT_CACHE_SIZE = 3L;
60+
private static final Long RIGHT_CACHE_SIZE = 2L;
61+
62+
@Parameters(name = "testRightCache = {0}")
63+
private static List<Boolean> parameters() {
64+
return Arrays.asList(false, true);
65+
}
66+
67+
@Parameter private boolean testRightCache;
68+
69+
private DeltaJoinCache cache;
70+
private Runnable requestCacheFunc;
71+
private Runnable hitCacheFunc;
72+
private BiConsumer<RowData, LinkedHashMap<RowData, Object>> buildCacheFunc;
73+
private TriConsumer<RowData, RowData, Object> upsertCacheFunc;
74+
75+
@BeforeEach
76+
void before() {
77+
cache = new DeltaJoinCache(LEFT_CACHE_SIZE, RIGHT_CACHE_SIZE);
78+
79+
requestCacheFunc =
80+
() -> {
81+
if (testRightCache) {
82+
cache.requestRightCache();
83+
} else {
84+
cache.requestLeftCache();
85+
}
86+
};
87+
hitCacheFunc =
88+
() -> {
89+
if (testRightCache) {
90+
cache.hitRightCache();
91+
} else {
92+
cache.hitLeftCache();
93+
}
94+
};
95+
buildCacheFunc = (key, ukDataMap) -> cache.buildCache(key, ukDataMap, testRightCache);
96+
upsertCacheFunc = (key, uk, data) -> cache.upsertCache(key, uk, data, testRightCache);
97+
}
98+
99+
@TestTemplate
100+
void testReportMetrics() {
101+
Map<String, Metric> allMetrics = new HashMap<>();
102+
cache.registerMetrics(
103+
new UnregisteredMetricGroups.UnregisteredOperatorMetricGroup() {
104+
@Override
105+
protected void addMetric(String name, Metric metric) {
106+
allMetrics.put(name, metric);
107+
super.addMetric(name, metric);
108+
}
109+
});
110+
111+
assertReportMetricsInternal(allMetrics, 0, 0, 0.0, 0, 0);
112+
requestCacheFunc.run();
113+
assertReportMetricsInternal(allMetrics, 1, 0, 0.0, 0, 0);
114+
hitCacheFunc.run();
115+
assertReportMetricsInternal(allMetrics, 1, 1, 1.0, 0, 0);
116+
requestCacheFunc.run();
117+
assertReportMetricsInternal(allMetrics, 2, 1, 0.5, 0, 0);
118+
119+
buildCacheFunc.accept(row("ck1"), Maps.newLinkedHashMap());
120+
assertReportMetricsInternal(allMetrics, 2, 1, 0.5, 1, 0);
121+
buildCacheFunc.accept(
122+
row("ck2"),
123+
Maps.newLinkedHashMap(Map.of(row("pk1"), 1, row("pk2"), 2, row("pk3"), 3)));
124+
assertReportMetricsInternal(allMetrics, 2, 1, 0.5, 2, 3);
125+
upsertCacheFunc.accept(row("ck1"), row("pk4"), 4);
126+
assertReportMetricsInternal(allMetrics, 2, 1, 0.5, 2, 4);
127+
}
128+
129+
@SuppressWarnings("unchecked")
130+
private void assertReportMetricsInternal(
131+
Map<String, Metric> actualAllMetrics,
132+
long expectedRequestCount,
133+
long expectedHitCount,
134+
double expectedHitRate,
135+
long expectedKeySize,
136+
long expectedNonEmptyValueSize) {
137+
String prefix = testRightCache ? RIGHT_CACHE_METRIC_PREFIX : LEFT_CACHE_METRIC_PREFIX;
138+
139+
String hitRate = prefix + METRIC_HIT_RATE;
140+
assertThat(((Gauge<Double>) actualAllMetrics.get(hitRate)).getValue())
141+
.isEqualTo(expectedHitRate);
142+
143+
String requestCount = prefix + METRIC_REQUEST_COUNT;
144+
assertThat(((Gauge<Long>) actualAllMetrics.get(requestCount)).getValue())
145+
.isEqualTo(expectedRequestCount);
146+
147+
String hitCount = prefix + METRIC_HIT_COUNT;
148+
assertThat(((Gauge<Long>) actualAllMetrics.get(hitCount)).getValue())
149+
.isEqualTo(expectedHitCount);
150+
151+
String keySize = prefix + METRIC_KEY_SIZE;
152+
assertThat(((Gauge<Long>) actualAllMetrics.get(keySize)).getValue())
153+
.isEqualTo(expectedKeySize);
154+
155+
String totalNonEmptyValueSize = prefix + METRIC_TOTAL_NON_EMPTY_VALUE_SIZE;
156+
assertThat(((Gauge<Long>) actualAllMetrics.get(totalNonEmptyValueSize)).getValue())
157+
.isEqualTo(expectedNonEmptyValueSize);
158+
}
159+
160+
@TestTemplate
161+
void testGetDataAfterBuildingCache() {
162+
Assumptions.assumeTrue(testRightCache);
163+
164+
RowData key = row("ck");
165+
LinkedHashMap<RowData, Object> rightPkDataMap =
166+
Maps.newLinkedHashMap(
167+
Map.of(row("pk1"), 1, row("pk2"), 2, row("pk3"), 3, row("pk4"), 4));
168+
cache.buildCache(key, rightPkDataMap, true);
169+
170+
assertThat(cache.getData(key, true)).isEqualTo(rightPkDataMap);
171+
assertThat(cache.getData(key, false)).isNull();
172+
assertThat(cache.getRightTotalNonEmptyValueSize().get()).isEqualTo(rightPkDataMap.size());
173+
174+
LinkedHashMap<RowData, Object> leftPkDataMap =
175+
Maps.newLinkedHashMap(Map.of(row("pk5"), 1, row("pk6"), 2));
176+
cache.buildCache(key, leftPkDataMap, false);
177+
assertThat(cache.getData(key, false)).isEqualTo(leftPkDataMap);
178+
assertThat(cache.getLeftTotalNonEmptyValueSize().get()).isEqualTo(leftPkDataMap.size());
179+
// right cache should not be affected
180+
assertThat(cache.getData(key, true)).isEqualTo(rightPkDataMap);
181+
}
182+
183+
@TestTemplate
184+
void testNotExceedCacheMaxSizeWithSingleKey() {
185+
RowData key1 = row("ck1");
186+
RowData key2 = row("ck2");
187+
188+
cache.buildCache(
189+
key1,
190+
Maps.newLinkedHashMap(
191+
Map.of(row("pk1"), 1, row("pk2"), 2, row("pk3"), 3, row("pk4"), 4)),
192+
testRightCache);
193+
cache.buildCache(
194+
key2,
195+
Maps.newLinkedHashMap(Map.of(row("pk1"), 1, row("pk2"), 2, row("pk3"), 3)),
196+
testRightCache);
197+
198+
if (testRightCache) {
199+
assertThat(cache.getRightCache().size()).isEqualTo(2);
200+
} else {
201+
assertThat(cache.getLeftCache().size()).isEqualTo(2);
202+
}
203+
assertThat(cache.getData(key1, testRightCache)).isNotNull().hasSize(4);
204+
assertThat(cache.getData(key2, testRightCache)).isNotNull().hasSize(3);
205+
}
206+
207+
@TestTemplate
208+
void testExceedCacheMaxSizeWithMultiKeys() {
209+
long testKeySize;
210+
if (testRightCache) {
211+
testKeySize = RIGHT_CACHE_SIZE + 1;
212+
} else {
213+
testKeySize = LEFT_CACHE_SIZE + 1;
214+
}
215+
216+
Map<RowData, LinkedHashMap<RowData, Object>> expectedCacheMap = new HashMap<>();
217+
long totalSize = 0;
218+
for (long i = 0; i < testKeySize; i++) {
219+
RowData key = row("ck" + i);
220+
LinkedHashMap<RowData, Object> pkDataMap = new LinkedHashMap<>();
221+
long pkDataMapSize = i + 1;
222+
for (long j = 0; j < pkDataMapSize; j++) {
223+
pkDataMap.put(row("pk" + j), j);
224+
}
225+
cache.buildCache(key, pkDataMap, testRightCache);
226+
227+
// the first key should be expired
228+
if (i != 0) {
229+
expectedCacheMap.put(key, pkDataMap);
230+
totalSize += pkDataMapSize;
231+
}
232+
}
233+
234+
if (testRightCache) {
235+
assertThat(cache.getRightCache().size()).isEqualTo(RIGHT_CACHE_SIZE);
236+
assertThat(cache.getRightCache().asMap()).isEqualTo(expectedCacheMap);
237+
assertThat(cache.getRightTotalNonEmptyValueSize().get()).isEqualTo(totalSize);
238+
} else {
239+
assertThat(cache.getLeftCache().size()).isEqualTo(LEFT_CACHE_SIZE);
240+
assertThat(cache.getLeftCache().asMap()).isEqualTo(expectedCacheMap);
241+
assertThat(cache.getLeftTotalNonEmptyValueSize().get()).isEqualTo(totalSize);
242+
}
243+
}
244+
245+
@TestTemplate
246+
void testBuildCacheWhileCacheExists() {
247+
cache.buildCache(row("ck"), Maps.newLinkedHashMap(), testRightCache);
248+
assertThatThrownBy(
249+
() -> cache.buildCache(row("ck"), Maps.newLinkedHashMap(), testRightCache))
250+
.isInstanceOf(IllegalStateException.class);
251+
}
252+
253+
@TestTemplate
254+
void testUpsertCacheWithoutCache() {
255+
RowData key = row("ck");
256+
RowData pk = row("pk");
257+
258+
assertThat(cache.getData(key, testRightCache)).isNull();
259+
cache.upsertCache(row("ck"), pk, 1, testRightCache);
260+
assertThat(cache.getData(key, testRightCache)).isNull();
261+
}
262+
263+
@TestTemplate
264+
void testTotalAllNonEmptyValueSizeChanges() {
265+
cache.buildCache(row("ck"), Maps.newLinkedHashMap(), testRightCache);
266+
if (testRightCache) {
267+
assertThat(cache.getRightCache().size()).isEqualTo(1);
268+
assertThat(cache.getRightTotalNonEmptyValueSize().get()).isEqualTo(0);
269+
} else {
270+
assertThat(cache.getLeftCache().size()).isEqualTo(1);
271+
assertThat(cache.getLeftTotalNonEmptyValueSize().get()).isEqualTo(0);
272+
}
273+
274+
cache.upsertCache(row("ck"), row("pk"), 1, testRightCache);
275+
if (testRightCache) {
276+
assertThat(cache.getRightCache().size()).isEqualTo(1);
277+
assertThat(cache.getRightTotalNonEmptyValueSize().get()).isEqualTo(1);
278+
} else {
279+
assertThat(cache.getLeftCache().size()).isEqualTo(1);
280+
assertThat(cache.getLeftTotalNonEmptyValueSize().get()).isEqualTo(1);
281+
}
282+
}
283+
}

0 commit comments

Comments
 (0)