Skip to content

Commit c4cc379

Browse files
committed
add more tests
1 parent c6b9ab9 commit c4cc379

File tree

2 files changed

+165
-7
lines changed

2 files changed

+165
-7
lines changed

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/deltajoin/DeltaJoinCache.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,17 @@
4141
* <p>Note: This cache is not thread-safe although its inner {@link Cache} is thread-safe.
4242
*/
4343
@NotThreadSafe
44+
@VisibleForTesting
4445
public class DeltaJoinCache {
4546

46-
private static final String LEFT_CACHE_METRIC_PREFIX = "deltaJoin.leftCache.";
47-
private static final String RIGHT_CACHE_METRIC_PREFIX = "deltaJoin.rightCache.";
47+
protected static final String LEFT_CACHE_METRIC_PREFIX = "deltaJoin.leftCache.";
48+
protected static final String RIGHT_CACHE_METRIC_PREFIX = "deltaJoin.rightCache.";
4849

49-
private static final String METRIC_HIT_RATE = "hitRate";
50-
private static final String METRIC_REQUEST_COUNT = "requestCount";
51-
private static final String METRIC_HIT_COUNT = "hitCount";
52-
private static final String METRIC_KEY_SIZE = "keySize";
53-
private static final String METRIC_TOTAL_NON_EMPTY_VALUE_SIZE = "totalNonEmptyValues";
50+
protected static final String METRIC_HIT_RATE = "hitRate";
51+
protected static final String METRIC_REQUEST_COUNT = "requestCount";
52+
protected static final String METRIC_HIT_COUNT = "hitCount";
53+
protected static final String METRIC_KEY_SIZE = "keySize";
54+
protected static final String METRIC_TOTAL_NON_EMPTY_VALUE_SIZE = "totalNonEmptyValues";
5455

5556
// use LinkedHashMap to keep order
5657
private final Cache<RowData, LinkedHashMap<RowData, Object>> leftCache;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.runtime.operators.join.deltajoin;
20+
21+
import org.apache.flink.metrics.Gauge;
22+
import org.apache.flink.metrics.Metric;
23+
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
24+
import org.apache.flink.table.data.RowData;
25+
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
26+
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
27+
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
28+
import org.apache.flink.util.function.TriConsumer;
29+
30+
import org.apache.flink.shaded.guava33.com.google.common.collect.Maps;
31+
32+
import org.junit.jupiter.api.BeforeEach;
33+
import org.junit.jupiter.api.TestTemplate;
34+
import org.junit.jupiter.api.extension.ExtendWith;
35+
36+
import java.util.Arrays;
37+
import java.util.HashMap;
38+
import java.util.LinkedHashMap;
39+
import java.util.List;
40+
import java.util.Map;
41+
import java.util.function.BiConsumer;
42+
43+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.LEFT_CACHE_METRIC_PREFIX;
44+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_HIT_COUNT;
45+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_HIT_RATE;
46+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_KEY_SIZE;
47+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_REQUEST_COUNT;
48+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.METRIC_TOTAL_NON_EMPTY_VALUE_SIZE;
49+
import static org.apache.flink.table.runtime.operators.join.deltajoin.DeltaJoinCache.RIGHT_CACHE_METRIC_PREFIX;
50+
import static org.apache.flink.table.runtime.util.StreamRecordUtils.row;
51+
import static org.assertj.core.api.Assertions.assertThat;
52+
53+
/** Test for {@link DeltaJoinCache}. */
54+
@ExtendWith(ParameterizedTestExtension.class)
55+
class DeltaJoinCacheTest {
56+
57+
private static final Long LEFT_CACHE_SIZE = 3L;
58+
private static final Long RIGHT_CACHE_SIZE = 2L;
59+
60+
@Parameters(name = "testRightCache = {0}")
61+
private static List<Boolean> parameters() {
62+
return Arrays.asList(false, true);
63+
}
64+
65+
@Parameter private boolean testRightCache;
66+
67+
private DeltaJoinCache cache;
68+
private Runnable requestCacheFunc;
69+
private Runnable hitCacheFunc;
70+
private BiConsumer<RowData, LinkedHashMap<RowData, Object>> buildCacheFunc;
71+
private TriConsumer<RowData, RowData, Object> upsertCacheFunc;
72+
73+
@BeforeEach
74+
void before() {
75+
cache = new DeltaJoinCache(LEFT_CACHE_SIZE, RIGHT_CACHE_SIZE);
76+
77+
requestCacheFunc =
78+
() -> {
79+
if (testRightCache) {
80+
cache.requestRightCache();
81+
} else {
82+
cache.requestLeftCache();
83+
}
84+
};
85+
hitCacheFunc =
86+
() -> {
87+
if (testRightCache) {
88+
cache.hitRightCache();
89+
} else {
90+
cache.hitLeftCache();
91+
}
92+
};
93+
buildCacheFunc = (key, ukDataMap) -> cache.buildCache(key, ukDataMap, testRightCache);
94+
upsertCacheFunc = (key, uk, data) -> cache.upsertCache(key, uk, data, testRightCache);
95+
}
96+
97+
@TestTemplate
98+
void testReportMetrics() {
99+
Map<String, Metric> allMetrics = new HashMap<>();
100+
cache.registerMetrics(
101+
new UnregisteredMetricGroups.UnregisteredOperatorMetricGroup() {
102+
@Override
103+
protected void addMetric(String name, Metric metric) {
104+
allMetrics.put(name, metric);
105+
super.addMetric(name, metric);
106+
}
107+
});
108+
109+
assertReportMetricsInternal(allMetrics, 0, 0, 0.0, 0, 0);
110+
requestCacheFunc.run();
111+
assertReportMetricsInternal(allMetrics, 1, 0, 0.0, 0, 0);
112+
hitCacheFunc.run();
113+
assertReportMetricsInternal(allMetrics, 1, 1, 1.0, 0, 0);
114+
requestCacheFunc.run();
115+
assertReportMetricsInternal(allMetrics, 2, 1, 0.5, 0, 0);
116+
117+
buildCacheFunc.accept(row("ck1"), Maps.newLinkedHashMap());
118+
assertReportMetricsInternal(allMetrics, 2, 1, 0.5, 1, 0);
119+
buildCacheFunc.accept(
120+
row("ck2"),
121+
Maps.newLinkedHashMap(Map.of(row("pk1"), 1, row("pk2"), 2, row("pk3"), 3)));
122+
assertReportMetricsInternal(allMetrics, 2, 1, 0.5, 2, 3);
123+
upsertCacheFunc.accept(row("ck1"), row("pk4"), 4);
124+
assertReportMetricsInternal(allMetrics, 2, 1, 0.5, 2, 4);
125+
}
126+
127+
@SuppressWarnings("unchecked")
128+
private void assertReportMetricsInternal(
129+
Map<String, Metric> actualAllMetrics,
130+
long expectedRequestCount,
131+
long expectedHitCount,
132+
double expectedHitRate,
133+
long expectedKeySize,
134+
long expectedNonEmptyValueSize) {
135+
String prefix = testRightCache ? RIGHT_CACHE_METRIC_PREFIX : LEFT_CACHE_METRIC_PREFIX;
136+
137+
String hitRate = prefix + METRIC_HIT_RATE;
138+
assertThat(((Gauge<Double>) actualAllMetrics.get(hitRate)).getValue())
139+
.isEqualTo(expectedHitRate);
140+
141+
String requestCount = prefix + METRIC_REQUEST_COUNT;
142+
assertThat(((Gauge<Long>) actualAllMetrics.get(requestCount)).getValue())
143+
.isEqualTo(expectedRequestCount);
144+
145+
String hitCount = prefix + METRIC_HIT_COUNT;
146+
assertThat(((Gauge<Long>) actualAllMetrics.get(hitCount)).getValue())
147+
.isEqualTo(expectedHitCount);
148+
149+
String keySize = prefix + METRIC_KEY_SIZE;
150+
assertThat(((Gauge<Long>) actualAllMetrics.get(keySize)).getValue())
151+
.isEqualTo(expectedKeySize);
152+
153+
String totalNonEmptyValueSize = prefix + METRIC_TOTAL_NON_EMPTY_VALUE_SIZE;
154+
assertThat(((Gauge<Long>) actualAllMetrics.get(totalNonEmptyValueSize)).getValue())
155+
.isEqualTo(expectedNonEmptyValueSize);
156+
}
157+
}

0 commit comments

Comments
 (0)