Skip to content

Commit 09dd449

Browse files
committed
add also intergration tests for streams and clients + address Andrew comment
Signed-off-by: see-quick <[email protected]>
1 parent 4b1fb23 commit 09dd449

File tree

8 files changed

+483
-45
lines changed

8 files changed

+483
-45
lines changed

checkstyle/import-control.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,9 @@
449449
<allow class="org.apache.kafka.server.telemetry.ClientTelemetry" />
450450
<allow class="org.apache.kafka.server.telemetry.ClientTelemetryPayload" />
451451
<allow class="org.apache.kafka.server.telemetry.ClientTelemetryReceiver" />
452+
<allow class="org.apache.kafka.server.telemetry.ClientTelemetryExporter" />
453+
<allow class="org.apache.kafka.server.telemetry.ClientTelemetryExporterProvider" />
454+
<allow class="org.apache.kafka.server.telemetry.ClientTelemetryContext" />
452455
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
453456
<allow class="org.apache.kafka.coordinator.transaction.TransactionLogConfig" />
454457
<allow pkg="org.apache.kafka.coordinator.group" />

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ClientTelemetryTest.java

Lines changed: 106 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@
3939
import org.apache.kafka.common.test.ClusterInstance;
4040
import org.apache.kafka.common.test.api.ClusterConfigProperty;
4141
import org.apache.kafka.common.test.api.ClusterTest;
42+
import org.apache.kafka.common.test.api.ClusterTests;
4243
import org.apache.kafka.common.test.api.Type;
4344
import org.apache.kafka.server.telemetry.ClientTelemetry;
45+
import org.apache.kafka.server.telemetry.ClientTelemetryExporter;
46+
import org.apache.kafka.server.telemetry.ClientTelemetryExporterProvider;
4447
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
4548

4649
import java.time.Duration;
@@ -53,6 +56,7 @@
5356
import java.util.Map;
5457
import java.util.Set;
5558
import java.util.UUID;
59+
import java.util.concurrent.ConcurrentHashMap;
5660
import java.util.concurrent.ExecutionException;
5761
import java.util.stream.Collectors;
5862

@@ -66,12 +70,27 @@
6670

6771
public class ClientTelemetryTest {
6872

69-
@ClusterTest(
70-
types = Type.KRAFT,
71-
brokers = 3,
72-
serverProperties = {
73-
@ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "org.apache.kafka.clients.admin.ClientTelemetryTest$GetIdClientTelemetry"),
74-
})
73+
@ClusterTests({
74+
@ClusterTest(
75+
types = Type.KRAFT,
76+
brokers = 3,
77+
serverProperties = {
78+
@ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "org.apache.kafka.clients.admin.ClientTelemetryTest$GetIdClientTelemetry"),
79+
}),
80+
@ClusterTest(
81+
types = Type.KRAFT,
82+
brokers = 3,
83+
serverProperties = {
84+
@ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "org.apache.kafka.clients.admin.ClientTelemetryTest$TelemetryExporter"),
85+
}),
86+
@ClusterTest(
87+
types = Type.KRAFT,
88+
brokers = 3,
89+
serverProperties = {
90+
@ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG,
91+
value = "org.apache.kafka.clients.admin.ClientTelemetryTest$GetIdClientTelemetry,org.apache.kafka.clients.admin.ClientTelemetryTest$TelemetryExporter"),
92+
})
93+
})
7594
public void testClientInstanceId(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
7695
Map<String, Object> configs = new HashMap<>();
7796
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
@@ -135,6 +154,51 @@ public void testIntervalMsParser(ClusterInstance clusterInstance) {
135154
}
136155
}
137156

157+
@ClusterTest(types = Type.KRAFT,
158+
brokers = 3,
159+
serverProperties = {
160+
@ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "org.apache.kafka.clients.admin.ClientTelemetryTest$TelemetryExporter")
161+
})
162+
public void testPushInterval(ClusterInstance clusterInstance) throws Exception {
163+
final int expectedIntervalMs = 1000;
164+
List<String> alterOpts = asList("--bootstrap-server", clusterInstance.bootstrapServers(),
165+
"--alter", "--entity-type", "client-metrics", "--entity-name", "test-interval",
166+
"--add-config", "interval.ms=" + expectedIntervalMs + ",metrics=org.apache.kafka.admin");
167+
168+
try (Admin adminForConfig = clusterInstance.admin()) {
169+
ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(Set.of(alterOpts)));
170+
ConfigCommand.alterConfig(adminForConfig, addOpts);
171+
}
172+
173+
Map<String, Object> configs = new HashMap<>();
174+
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers());
175+
configs.put(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG, true);
176+
177+
try (Admin admin = Admin.create(configs)) {
178+
Uuid uuid = admin.clientInstanceId(Duration.ofSeconds(10));
179+
assertNotNull(uuid);
180+
181+
long startTime = System.currentTimeMillis();
182+
// Wait for at least 2 more pushes to verify metrics are being pushed at the expected interval
183+
while (TelemetryExporter.PUSH_TIMESTAMPS.getOrDefault(uuid, Collections.emptyList()).size() < 3
184+
&& System.currentTimeMillis() - startTime < 5000) {
185+
Thread.sleep(100);
186+
}
187+
188+
List<Long> timestamps = TelemetryExporter.PUSH_TIMESTAMPS.get(uuid);
189+
190+
assertNotNull(timestamps, "Should have captured push timestamps");
191+
assertTrue(timestamps.size() >= 2, "Should have received at least 2 metric pushes, got: " + timestamps.size());
192+
193+
// Verify the interval between pushes is approximately 1 second
194+
for (int i = 1; i < timestamps.size(); i++) {
195+
long intervalMs = timestamps.get(i) - timestamps.get(i - 1);
196+
assertTrue(intervalMs >= 800 && intervalMs <= 1200,
197+
"Interval between pushes should be approximately " + expectedIntervalMs + "ms (allowing for jitter), but was: " + intervalMs + "ms");
198+
}
199+
}
200+
}
201+
138202
@ClusterTest(types = Type.KRAFT)
139203
public void testMetrics(ClusterInstance clusterInstance) {
140204
Map<String, Object> configs = new HashMap<>();
@@ -192,4 +256,40 @@ public ClientTelemetryReceiver clientReceiver() {
192256
}
193257
}
194258

259+
@SuppressWarnings("unused")
260+
public static class TelemetryExporter implements ClientTelemetryExporterProvider, MetricsReporter {
261+
262+
public static final Map<Uuid, Integer> PUSH_INTERVALS = new ConcurrentHashMap<>();
263+
public static final Map<Uuid, List<Long>> PUSH_TIMESTAMPS = new ConcurrentHashMap<>();
264+
265+
@Override
266+
public void init(List<KafkaMetric> metrics) {
267+
}
268+
269+
@Override
270+
public void metricChange(KafkaMetric metric) {
271+
}
272+
273+
@Override
274+
public void metricRemoval(KafkaMetric metric) {
275+
}
276+
277+
@Override
278+
public void close() {
279+
}
280+
281+
@Override
282+
public void configure(Map<String, ?> configs) {
283+
}
284+
285+
@Override
286+
public ClientTelemetryExporter clientTelemetryExporter() {
287+
return (context, payload) -> {
288+
Uuid clientId = payload.clientInstanceId();
289+
PUSH_INTERVALS.put(clientId, context.pushIntervalMs());
290+
PUSH_TIMESTAMPS.computeIfAbsent(clientId, k -> new ArrayList<>()).add(System.currentTimeMillis());
291+
};
292+
}
293+
}
294+
195295
}

clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryExporterProvider.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121

2222
/**
2323
* A {@link MetricsReporter} may implement this interface to indicate support for collecting client
24-
* telemetry on the server side using the new exporter API. This interface replaces the deprecated
25-
* {@link ClientTelemetry} interface.
24+
* telemetry on the server side using the new exporter API.
2625
*/
2726
public interface ClientTelemetryExporterProvider {
2827

server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryContext.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,7 @@
2222
/**
2323
* Default implementation of {@link ClientTelemetryContext}.
2424
*/
25-
public class DefaultClientTelemetryContext implements ClientTelemetryContext {
25+
public record DefaultClientTelemetryContext(int pushIntervalMs,
26+
AuthorizableRequestContext authorizableRequestContext) implements ClientTelemetryContext {
2627

27-
private final int pushIntervalMs;
28-
private final AuthorizableRequestContext authorizableRequestContext;
29-
30-
public DefaultClientTelemetryContext(int pushIntervalMs, AuthorizableRequestContext authorizableRequestContext) {
31-
this.pushIntervalMs = pushIntervalMs;
32-
this.authorizableRequestContext = authorizableRequestContext;
33-
}
34-
35-
@Override
36-
public int pushIntervalMs() {
37-
return pushIntervalMs;
38-
}
39-
40-
@Override
41-
public AuthorizableRequestContext authorizableRequestContext() {
42-
return authorizableRequestContext;
43-
}
4428
}

0 commit comments

Comments
 (0)