configs) {
}
@Override
- public ClientTelemetryReceiver clientReceiver() {
+ public ClientTelemetryExporter clientTelemetryExporter() {
return (context, payload) -> {
};
}
diff --git a/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetry.java b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetry.java
index d179a55a91174..3bb4db2a82de7 100644
--- a/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetry.java
+++ b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetry.java
@@ -22,7 +22,13 @@
/**
* A {@link MetricsReporter} may implement this interface to indicate support for collecting client
* telemetry on the server side.
+ *
+ * @deprecated Since 4.2.0, use {@link ClientTelemetryExporterProvider} instead. This interface will be
+ * removed in Kafka 5.0.0. The new interface provides a {@link ClientTelemetryExporter}
+ * which includes additional context such as the push interval.
*/
+@Deprecated(since = "4.2", forRemoval = true)
+@SuppressWarnings("removal")
public interface ClientTelemetry {
/**
diff --git a/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryContext.java b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryContext.java
new file mode 100644
index 0000000000000..b62e34426a362
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryContext.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.telemetry;
+
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+
+/**
+ * {@code ClientTelemetryContext} provides context information for client telemetry requests,
+ * including the push interval.
+ */
+public interface ClientTelemetryContext {
+
+ /**
+ * Returns the interval at which the client pushes telemetry metrics to the broker.
+ * This is the interval from the subscription.
+ *
+ * Note that for the initial metric push and pushes following a subscription update
+ * or error, a jitter (between 0.5x and 1.5x of this interval) is applied to avoid
+ * multiple clients sending requests simultaneously.
+ *
+ * This value can be used by metrics exporters to determine when metrics should be
+ * considered stale or expired.
+ *
+ * @return the push interval in milliseconds from the subscription
+ */
+ int pushIntervalMs();
+
+ /**
+ * Returns the authorization context for the client request.
+ *
+ * @return the client request context for the corresponding {@code PushTelemetryRequest} API call
+ */
+ AuthorizableRequestContext authorizableRequestContext();
+}
diff --git a/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryExporter.java b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryExporter.java
new file mode 100644
index 0000000000000..8c8bf55d518c6
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryExporter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.telemetry;
+
+/**
+ * {@code ClientTelemetryExporter} defines the behavior for telemetry exporters on the broker side
+ * which receive and export client telemetry metrics and provides additional context including the
+ * push interval.
+ */
+public interface ClientTelemetryExporter {
+
+ /**
+ * Called by the broker when a client reports telemetry metrics. The telemetry context
+ * includes the push interval and authorization details which can be used by the metrics
+ * exporter to manage metric lifecycle and retrieval of additional client information.
+ *
+ * This method may be called from the request handling thread, and as such should avoid blocking.
+ *
+ * @param context the client telemetry context including push interval and request authorization context
+ * @param payload the encoded telemetry payload as sent by the client
+ */
+ void exportMetrics(ClientTelemetryContext context, ClientTelemetryPayload payload);
+}
diff --git a/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryExporterProvider.java b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryExporterProvider.java
new file mode 100644
index 0000000000000..39d01cb08a4e1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryExporterProvider.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.telemetry;
+
+import org.apache.kafka.common.metrics.MetricsReporter;
+
+/**
+ * A {@link MetricsReporter} may implement this interface to indicate support for collecting client
+ * telemetry on the server side using the new exporter API.
+ */
+public interface ClientTelemetryExporterProvider {
+
+ /**
+ * Called by the broker to fetch instance of {@link ClientTelemetryExporter}.
+ *
+ * This instance may be cached by the broker.
+ *
+ * @return broker side instance of {@link ClientTelemetryExporter}
+ */
+ ClientTelemetryExporter clientTelemetryExporter();
+}
diff --git a/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryPayload.java b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryPayload.java
index 9638366860242..6d3259f87e8d0 100644
--- a/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryPayload.java
+++ b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryPayload.java
@@ -22,8 +22,8 @@
import java.nio.ByteBuffer;
/**
- * A client telemetry payload as sent by the client to the telemetry receiver. The payload is
- * received by the broker's {@link ClientTelemetryReceiver} implementation.
+ * A client telemetry payload as sent by the client to the telemetry exporter. The payload is
+ * exported using a {@link ClientTelemetryExporter}.
*/
public interface ClientTelemetryPayload {
diff --git a/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryReceiver.java b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryReceiver.java
index eb60355676207..addc2a5c075cf 100644
--- a/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryReceiver.java
+++ b/clients/src/main/java/org/apache/kafka/server/telemetry/ClientTelemetryReceiver.java
@@ -22,7 +22,12 @@
/**
* {@code ClientTelemetryReceiver} defines the behaviour for telemetry receiver on the broker side
* which receives client telemetry metrics.
+ *
+ * @deprecated Since 4.2.0, use {@link ClientTelemetryExporter} instead. This interface will be
+ * removed in Kafka 5.0.0. The new interface provides additional context including
+ * the push interval to enable better metric lifecycle management.
*/
+@Deprecated(since = "4.2", forRemoval = true)
public interface ClientTelemetryReceiver {
/**
* Called by the broker when a client reports telemetry metrics. The associated request context
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 9404c2b216707..9a390bf7a2831 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -48,7 +48,7 @@ import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig}
-import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics}
+import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYammerMetrics}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpStatePersister, Persister, PersisterStateManager}
import org.apache.kafka.server.share.session.ShareSessionCache
@@ -188,9 +188,9 @@ class BrokerServer(
info("Starting broker")
- val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin()
+ val clientTelemetryExporterPlugin = new ClientTelemetryExporterPlugin()
- config.dynamicConfig.initialize(Some(clientMetricsReceiverPlugin))
+ config.dynamicConfig.initialize(Some(clientTelemetryExporterPlugin))
quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-", ProcessRole.BrokerRole.toString)
DynamicBrokerConfig.readDynamicBrokerConfigsFromSnapshot(raftManager, config, quotaManagers, logContext)
@@ -243,7 +243,7 @@ class BrokerServer(
)
clientToControllerChannelManager.start()
forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager, metrics)
- clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, config.clientTelemetryMaxBytes, time, metrics)
+ clientMetricsManager = new ClientMetricsManager(clientTelemetryExporterPlugin, config.clientTelemetryMaxBytes, time, metrics)
val apiVersionManager = new DefaultApiVersionManager(
ListenerType.BROKER,
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 28a1e3bdfb247..13adbc4c43d6f 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -130,7 +130,7 @@ class ControllerServer(
try {
this.logIdent = logContext.logPrefix()
info("Starting controller")
- config.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None)
+ config.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None)
maybeChangeStatus(STARTING, STARTED)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 124a4c7b78f4c..365fef6eb9759 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -44,8 +44,8 @@ import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
-import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
-import org.apache.kafka.server.telemetry.ClientTelemetry
+import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs}
+import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider}
import org.apache.kafka.snapshot.RecordsSnapshotReader
import org.apache.kafka.storage.internals.log.{LogCleaner, LogConfig}
@@ -259,12 +259,12 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private[server] val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
- private var metricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin] = _
+ private var telemetryExporterPluginOpt: Option[ClientTelemetryExporterPlugin] = _
private var currentConfig: KafkaConfig = _
- private[server] def initialize(clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
+ private[server] def initialize(clientTelemetryExporterPluginOpt: Option[ClientTelemetryExporterPlugin]): Unit = {
currentConfig = new KafkaConfig(kafkaConfig.props, false)
- metricsReceiverPluginOpt = clientMetricsReceiverPluginOpt
+ telemetryExporterPluginOpt = clientTelemetryExporterPluginOpt
}
/**
@@ -374,8 +374,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
dynamicDefaultConfigs.clone()
}
- private[server] def clientMetricsReceiverPlugin: Option[ClientMetricsReceiverPlugin] = CoreUtils.inReadLock(lock) {
- metricsReceiverPluginOpt
+ private[server] def clientTelemetryExporterPlugin: Option[ClientTelemetryExporterPlugin] = CoreUtils.inReadLock(lock) {
+ telemetryExporterPluginOpt
}
private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) {
@@ -841,18 +841,22 @@ class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Me
reporters.forEach { reporter =>
metrics.addReporter(reporter)
currentReporters += reporter.getClass.getName -> reporter
- val clientTelemetryReceiver = reporter match {
- case telemetry: ClientTelemetry => telemetry.clientReceiver()
- case _ => null
- }
- if (clientTelemetryReceiver != null) {
- dynamicConfig.clientMetricsReceiverPlugin match {
- case Some(receiverPlugin) =>
- receiverPlugin.add(clientTelemetryReceiver)
- case None =>
- // Do nothing
- }
+ // Support both deprecated ClientTelemetry and new ClientTelemetryExporterProvider interfaces
+ // If a class implements both, only use the new (i.e., ClientTelemetryExporterProvider interface)
+ dynamicConfig.clientTelemetryExporterPlugin match {
+ case Some(telemetryExporterPlugin) =>
+ reporter match {
+ case exporterProvider: ClientTelemetryExporterProvider =>
+ // Use new interface (i.e., takes precedence even if class also implements deprecated interface)
+ telemetryExporterPlugin.add(exporterProvider.clientTelemetryExporter())
+ case telemetry: ClientTelemetry =>
+ telemetryExporterPlugin.add(telemetry.clientReceiver())
+ case _ =>
+ // Reporter doesn't support client telemetry
+ }
+ case None =>
+ // Do nothing
}
}
KafkaBroker.notifyClusterListeners(clusterId, reporters.asScala)
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index aba9035cb7e94..1f05c7a0b63aa 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -269,7 +269,7 @@ class SharedServer(
// This is only done in tests.
metrics = new Metrics()
}
- sharedServerConfig.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None)
+ sharedServerConfig.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None)
if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) {
brokerMetrics = new BrokerServerMetrics(metrics)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 141b5138c0753..6d269c23f6797 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.{Endpoint, Reconfigurable}
import org.apache.kafka.common.acl.{AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
import org.apache.kafka.common.internals.Plugin
-import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
+import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetric, Metrics, MetricsReporter}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.raft.QuorumConfig
@@ -37,7 +37,8 @@ import org.apache.kafka.server.DynamicThreadPool
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig}
-import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
+import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYammerMetrics, MetricConfigs}
+import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryContext, ClientTelemetryExporter, ClientTelemetryExporterProvider, ClientTelemetryPayload, ClientTelemetryReceiver}
import org.apache.kafka.server.util.KafkaScheduler
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, ProducerStateManagerConfig}
import org.apache.kafka.test.MockMetricsReporter
@@ -1065,6 +1066,45 @@ class DynamicBrokerConfigTest {
)
assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG))
}
+
+ @Test
+ def testClientTelemetryExporter(): Unit = {
+ val brokerId = 0
+ val origProps = TestUtils.createBrokerConfig(brokerId, port = 8181)
+ val config = KafkaConfig(origProps)
+ val metrics = mock(classOf[Metrics])
+ val telemetryPlugin = mock(classOf[ClientTelemetryExporterPlugin])
+
+ config.dynamicConfig.initialize(Some(telemetryPlugin))
+ val m = new DynamicMetricsReporters(brokerId, config, metrics, "clusterId")
+ config.dynamicConfig.addReconfigurable(m)
+
+ def updateReporter(reporterClass: Class[_]): Unit = {
+ val props = new Properties()
+ props.put(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, reporterClass.getName)
+ config.dynamicConfig.updateDefaultConfig(props)
+ }
+
+ // Reporter implementing only ClientTelemetryExporterProvider
+ updateReporter(classOf[TestExporterOnly])
+ verify(telemetryPlugin, Mockito.times(1)).add(ArgumentMatchers.any(classOf[ClientTelemetryExporter]))
+ Mockito.reset(telemetryPlugin)
+
+ // Reporter implementing only ClientTelemetryReceiver (deprecated)
+ updateReporter(classOf[TestReceiverOnly])
+ verify(telemetryPlugin, Mockito.times(1)).add(ArgumentMatchers.any(classOf[ClientTelemetryReceiver]))
+ Mockito.reset(telemetryPlugin)
+
+ // Reporter implementing both interfaces => only exporter should be used
+ updateReporter(classOf[TestReceiverAndExporter])
+ verify(telemetryPlugin, Mockito.times(1)).add(ArgumentMatchers.any(classOf[ClientTelemetryExporter]))
+ verify(telemetryPlugin, Mockito.never()).add(ArgumentMatchers.any(classOf[ClientTelemetryReceiver]))
+ Mockito.reset(telemetryPlugin)
+
+ // Reporter implementing neither interface => nothing should be added
+ updateReporter(classOf[MockMetricsReporter])
+ verifyNoMoreInteractions(telemetryPlugin)
+ }
}
class TestDynamicThreadPool extends BrokerReconfigurable {
@@ -1086,3 +1126,38 @@ class TestDynamicThreadPool extends BrokerReconfigurable {
assertEquals(100, newConfig.backgroundThreads)
}
}
+
+class TestExporterOnly extends MetricsReporter with ClientTelemetryExporterProvider {
+ override def configure(configs: util.Map[String, _]): Unit = {}
+ override def init(metrics: util.List[KafkaMetric]): Unit = {}
+ override def metricChange(metric: KafkaMetric): Unit = {}
+ override def metricRemoval(metric: KafkaMetric): Unit = {}
+ override def close(): Unit = {}
+
+ override def clientTelemetryExporter(): ClientTelemetryExporter = (_: ClientTelemetryContext, _: ClientTelemetryPayload) => {}
+}
+
+@SuppressWarnings(Array("deprecation"))
+class TestReceiverOnly extends MetricsReporter with ClientTelemetry {
+ override def configure(configs: util.Map[String, _]): Unit = {}
+ override def init(metrics: util.List[KafkaMetric]): Unit = {}
+ override def metricChange(metric: KafkaMetric): Unit = {}
+ override def metricRemoval(metric: KafkaMetric): Unit = {}
+ override def close(): Unit = {}
+
+ override def clientReceiver(): ClientTelemetryReceiver = (_: AuthorizableRequestContext, _: ClientTelemetryPayload) => {}
+}
+
+@SuppressWarnings(Array("deprecation"))
+class TestReceiverAndExporter extends MetricsReporter
+ with ClientTelemetryExporterProvider with ClientTelemetry {
+ override def configure(configs: util.Map[String, _]): Unit = {}
+ override def init(metrics: util.List[KafkaMetric]): Unit = {}
+ override def metricChange(metric: KafkaMetric): Unit = {}
+ override def metricRemoval(metric: KafkaMetric): Unit = {}
+ override def close(): Unit = {}
+
+ override def clientTelemetryExporter(): ClientTelemetryExporter = (_: ClientTelemetryContext, _: ClientTelemetryPayload) => {}
+
+ override def clientReceiver(): ClientTelemetryReceiver = (_: AuthorizableRequestContext, _: ClientTelemetryPayload) => {}
+}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 3d801536072cc..01cd8e9663afe 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -11532,7 +11532,7 @@ class KafkaApisTest extends Logging {
val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData(), true).build())
- when(clientMetricsManager.isTelemetryReceiverConfigured).thenReturn(true)
+ when(clientMetricsManager.isTelemetryExporterConfigured).thenReturn(true)
when(clientMetricsManager.processGetTelemetrySubscriptionRequest(any[GetTelemetrySubscriptionsRequest](),
any[RequestContext]())).thenReturn(new GetTelemetrySubscriptionsResponse(
new GetTelemetrySubscriptionsResponseData()))
@@ -11552,7 +11552,7 @@ class KafkaApisTest extends Logging {
val request = buildRequest(new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData(), true).build())
- when(clientMetricsManager.isTelemetryReceiverConfigured).thenReturn(true)
+ when(clientMetricsManager.isTelemetryExporterConfigured).thenReturn(true)
when(clientMetricsManager.processGetTelemetrySubscriptionRequest(any[GetTelemetrySubscriptionsRequest](),
any[RequestContext]())).thenThrow(new RuntimeException("test"))
@@ -11570,7 +11570,7 @@ class KafkaApisTest extends Logging {
def testPushTelemetry(): Unit = {
val request = buildRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true).build())
- when(clientMetricsManager.isTelemetryReceiverConfigured).thenReturn(true)
+ when(clientMetricsManager.isTelemetryExporterConfigured).thenReturn(true)
when(clientMetricsManager.processPushTelemetryRequest(any[PushTelemetryRequest](), any[RequestContext]()))
.thenReturn(new PushTelemetryResponse(new PushTelemetryResponseData()))
@@ -11587,7 +11587,7 @@ class KafkaApisTest extends Logging {
def testPushTelemetryWithException(): Unit = {
val request = buildRequest(new PushTelemetryRequest.Builder(new PushTelemetryRequestData(), true).build())
- when(clientMetricsManager.isTelemetryReceiverConfigured).thenReturn(true)
+ when(clientMetricsManager.isTelemetryExporterConfigured).thenReturn(true)
when(clientMetricsManager.processPushTelemetryRequest(any[PushTelemetryRequest](), any[RequestContext]()))
.thenThrow(new RuntimeException("test"))
diff --git a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
index d7343098b4e2a..87d28d06f2f4e 100644
--- a/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
+++ b/server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java
@@ -49,7 +49,7 @@
import org.apache.kafka.server.metrics.ClientMetricsConfigs;
import org.apache.kafka.server.metrics.ClientMetricsInstance;
import org.apache.kafka.server.metrics.ClientMetricsInstanceMetadata;
-import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin;
+import org.apache.kafka.server.metrics.ClientTelemetryExporterPlugin;
import org.apache.kafka.server.network.ConnectionDisconnectListener;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
@@ -90,7 +90,7 @@ public class ClientMetricsManager implements AutoCloseable {
private static final int CACHE_MAX_SIZE = 16384;
private static final int DEFAULT_CACHE_EXPIRY_MS = 60 * 1000;
- private final ClientMetricsReceiverPlugin receiverPlugin;
+ private final ClientTelemetryExporterPlugin clientTelemetryExporterPlugin;
private final Cache clientInstanceCache;
private final Map clientConnectionIdMap;
private final Timer expirationTimer;
@@ -107,13 +107,13 @@ public class ClientMetricsManager implements AutoCloseable {
// to re-evaluate the client instance subscription id as per changed subscriptions.
private final AtomicInteger subscriptionUpdateVersion;
- public ClientMetricsManager(ClientMetricsReceiverPlugin receiverPlugin, int clientTelemetryMaxBytes, Time time, Metrics metrics) {
- this(receiverPlugin, clientTelemetryMaxBytes, time, DEFAULT_CACHE_EXPIRY_MS, metrics);
+ public ClientMetricsManager(ClientTelemetryExporterPlugin clientTelemetryExporterPlugin, int clientTelemetryMaxBytes, Time time, Metrics metrics) {
+ this(clientTelemetryExporterPlugin, clientTelemetryMaxBytes, time, DEFAULT_CACHE_EXPIRY_MS, metrics);
}
// Visible for testing
- ClientMetricsManager(ClientMetricsReceiverPlugin receiverPlugin, int clientTelemetryMaxBytes, Time time, int cacheExpiryMs, Metrics metrics) {
- this.receiverPlugin = receiverPlugin;
+ ClientMetricsManager(ClientTelemetryExporterPlugin clientTelemetryExporterPlugin, int clientTelemetryMaxBytes, Time time, int cacheExpiryMs, Metrics metrics) {
+ this.clientTelemetryExporterPlugin = clientTelemetryExporterPlugin;
this.subscriptionMap = new ConcurrentHashMap<>();
this.subscriptionUpdateVersion = new AtomicInteger(0);
this.clientInstanceCache = new SynchronizedCache<>(new LRUCache<>(CACHE_MAX_SIZE));
@@ -214,7 +214,7 @@ public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest re
if (metrics != null && metrics.limit() > 0) {
try {
long exportTimeStartMs = time.hiResClockMs();
- receiverPlugin.exportMetrics(requestContext, request);
+ clientTelemetryExporterPlugin.exportMetrics(requestContext, request, clientInstance.pushIntervalMs());
clientMetricsStats.recordPluginExport(clientInstanceId, time.hiResClockMs() - exportTimeStartMs);
} catch (Throwable exception) {
clientMetricsStats.recordPluginErrorCount(clientInstanceId);
@@ -228,8 +228,8 @@ public PushTelemetryResponse processPushTelemetryRequest(PushTelemetryRequest re
return new PushTelemetryResponse(new PushTelemetryResponseData());
}
- public boolean isTelemetryReceiverConfigured() {
- return !receiverPlugin.isEmpty();
+ public boolean isTelemetryExporterConfigured() {
+ return !clientTelemetryExporterPlugin.isEmpty();
}
public ConnectionDisconnectListener connectionDisconnectListener() {
diff --git a/server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java b/server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java
index 9eb835ccfbf3b..279c01460dc88 100644
--- a/server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java
+++ b/server/src/main/java/org/apache/kafka/server/DefaultApiVersionManager.java
@@ -78,7 +78,7 @@ public ApiMessageType.ListenerType listenerType() {
public ApiVersionsResponse apiVersionResponse(int throttleTimeMs, boolean alterFeatureLevel0) {
FinalizedFeatures finalizedFeatures = metadataCache.features();
Optional controllerApiVersions = nodeApiVersionsSupplier.get();
- boolean clientTelemetryEnabled = clientMetricsManager.map(ClientMetricsManager::isTelemetryReceiverConfigured).orElse(false);
+ boolean clientTelemetryEnabled = clientMetricsManager.map(ClientMetricsManager::isTelemetryExporterConfigured).orElse(false);
ApiVersionsResponseData.ApiVersionCollection apiVersions = controllerApiVersions
.map(nodeApiVersions -> ApiVersionsResponse.controllerApiVersions(
nodeApiVersions,
diff --git a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java b/server/src/main/java/org/apache/kafka/server/metrics/ClientTelemetryExporterPlugin.java
similarity index 58%
rename from server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java
rename to server/src/main/java/org/apache/kafka/server/metrics/ClientTelemetryExporterPlugin.java
index c608510350ace..0836dd730dab8 100644
--- a/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPlugin.java
+++ b/server/src/main/java/org/apache/kafka/server/metrics/ClientTelemetryExporterPlugin.java
@@ -18,6 +18,7 @@
import org.apache.kafka.common.requests.PushTelemetryRequest;
import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryExporter;
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
import java.util.ArrayList;
@@ -25,33 +26,51 @@
import java.util.List;
/**
- * Plugin to register client telemetry receivers and export metrics. This class is used by the Kafka
- * server to export client metrics to the registered receivers.
+ * Plugin to register client telemetry receivers/exporters and export metrics. This class is used by the Kafka
+ * server to export client metrics to the registered receivers and exporters, supporting both the deprecated
+ * {@link ClientTelemetryReceiver} and the new {@link ClientTelemetryExporter} interfaces.
*/
-public class ClientMetricsReceiverPlugin {
+@SuppressWarnings({"deprecation", "overloads", "removal"})
+public class ClientTelemetryExporterPlugin {
private final List receivers;
+ private final List exporters;
- public ClientMetricsReceiverPlugin() {
+ public ClientTelemetryExporterPlugin() {
this.receivers = Collections.synchronizedList(new ArrayList<>());
+ this.exporters = Collections.synchronizedList(new ArrayList<>());
}
public boolean isEmpty() {
- return receivers.isEmpty();
+ return receivers.isEmpty() && exporters.isEmpty();
}
public void add(ClientTelemetryReceiver receiver) {
receivers.add(receiver);
}
+ public void add(ClientTelemetryExporter exporter) {
+ exporters.add(exporter);
+ }
+
public DefaultClientTelemetryPayload getPayLoad(PushTelemetryRequest request) {
return new DefaultClientTelemetryPayload(request);
}
- public void exportMetrics(RequestContext context, PushTelemetryRequest request) {
+ public void exportMetrics(RequestContext context, PushTelemetryRequest request, int pushIntervalMs) {
DefaultClientTelemetryPayload payload = getPayLoad(request);
+
+ // Export to deprecated receivers
for (ClientTelemetryReceiver receiver : receivers) {
receiver.exportMetrics(context, payload);
}
+
+ // Export to new exporters with push interval context
+ if (!exporters.isEmpty()) {
+ DefaultClientTelemetryContext telemetryContext = new DefaultClientTelemetryContext(pushIntervalMs, context);
+ for (ClientTelemetryExporter exporter : exporters) {
+ exporter.exportMetrics(telemetryContext, payload);
+ }
+ }
}
}
diff --git a/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryContext.java b/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryContext.java
new file mode 100644
index 0000000000000..282e5ad2d12f3
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/metrics/DefaultClientTelemetryContext.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryContext;
+
+/**
+ * Default implementation of {@link ClientTelemetryContext}.
+ */
+public record DefaultClientTelemetryContext(int pushIntervalMs,
+ AuthorizableRequestContext authorizableRequestContext) implements ClientTelemetryContext {
+
+}
diff --git a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
index c5b13eefbe45b..79b74abd9078b 100644
--- a/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
+++ b/server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java
@@ -33,8 +33,8 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.server.metrics.ClientMetricsConfigs;
import org.apache.kafka.server.metrics.ClientMetricsInstance;
-import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin;
import org.apache.kafka.server.metrics.ClientMetricsTestUtils;
+import org.apache.kafka.server.metrics.ClientTelemetryExporterPlugin;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.test.TestUtils;
@@ -76,7 +76,7 @@ public class ClientMetricsManagerTest {
private MockTime time;
private Metrics kafkaMetrics;
- private ClientMetricsReceiverPlugin clientMetricsReceiverPlugin;
+ private ClientTelemetryExporterPlugin clientTelemetryExporterPlugin;
private ClientMetricsManager clientMetricsManager;
@AfterAll
@@ -94,8 +94,8 @@ public static void ensureNoThreadLeak() throws InterruptedException {
public void setUp() {
time = new MockTime();
kafkaMetrics = new Metrics();
- clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin();
- clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time, 100, kafkaMetrics);
+ clientTelemetryExporterPlugin = new ClientTelemetryExporterPlugin();
+ clientMetricsManager = new ClientMetricsManager(clientTelemetryExporterPlugin, 100, time, 100, kafkaMetrics);
}
@AfterEach
@@ -357,7 +357,7 @@ public void testGetTelemetrySameClientImmediateRetryAfterPushFail() throws Excep
// the one with new client instance.
try (
Metrics kafkaMetrics = new Metrics();
- ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time, kafkaMetrics)
+ ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientTelemetryExporterPlugin, 100, time, kafkaMetrics)
) {
PushTelemetryRequest pushRequest = new Builder(
@@ -597,7 +597,7 @@ public void testPushTelemetryOnNewServer() throws Exception {
// the one with new client instance.
try (
Metrics kafkaMetrics = new Metrics();
- ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time, kafkaMetrics)
+ ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientTelemetryExporterPlugin, 100, time, kafkaMetrics)
) {
PushTelemetryRequest request = new PushTelemetryRequest.Builder(
@@ -889,7 +889,7 @@ public void testPushTelemetryNullMetricsData() throws Exception {
public void testPushTelemetryMetricsTooLarge() throws Exception {
try (
Metrics kafkaMetrics = new Metrics();
- ClientMetricsManager clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 1, time, kafkaMetrics)
+ ClientMetricsManager clientMetricsManager = new ClientMetricsManager(clientTelemetryExporterPlugin, 1, time, kafkaMetrics)
) {
GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
@@ -944,7 +944,7 @@ public void testPushTelemetryConcurrentRequestNewClientInstance() throws Excepti
try (
Metrics kafkaMetrics = new Metrics();
- ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 100, time, kafkaMetrics)
+ ClientMetricsManager newClientMetricsManager = new ClientMetricsManager(clientTelemetryExporterPlugin, 100, time, kafkaMetrics)
) {
Thread thread = new Thread(() -> {
@@ -1078,8 +1078,8 @@ public void testPushTelemetryConcurrentRequestAfterSubscriptionUpdate() throws E
@Test
public void testPushTelemetryPluginException() throws Exception {
- ClientMetricsReceiverPlugin receiverPlugin = Mockito.mock(ClientMetricsReceiverPlugin.class);
- Mockito.doThrow(new RuntimeException("test exception")).when(receiverPlugin).exportMetrics(Mockito.any(), Mockito.any());
+ ClientTelemetryExporterPlugin receiverPlugin = Mockito.mock(ClientTelemetryExporterPlugin.class);
+ Mockito.doThrow(new RuntimeException("test exception")).when(receiverPlugin).exportMetrics(Mockito.any(), Mockito.any(), Mockito.anyInt());
try (
Metrics kafkaMetrics = new Metrics();
@@ -1198,7 +1198,7 @@ public void testGetTelemetrySubscriptionAfterPushTelemetryUnknownCompressionSucc
public void testGetTelemetrySubscriptionAfterPushTelemetryBytesExceptionFails() throws Exception {
try (
Metrics kafkaMetrics = new Metrics();
- ClientMetricsManager clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, 1, time, kafkaMetrics)
+ ClientMetricsManager clientMetricsManager = new ClientMetricsManager(clientTelemetryExporterPlugin, 1, time, kafkaMetrics)
) {
GetTelemetrySubscriptionsRequest subscriptionsRequest = new GetTelemetrySubscriptionsRequest.Builder(
new GetTelemetrySubscriptionsRequestData(), true).build();
diff --git a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
deleted file mode 100644
index 698e872095494..0000000000000
--- a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsReceiverPluginTest.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.server.metrics;
-
-import org.apache.kafka.common.message.PushTelemetryRequestData;
-import org.apache.kafka.common.requests.PushTelemetryRequest;
-import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientMetricsReceiver;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-public class ClientMetricsReceiverPluginTest {
-
- private TestClientMetricsReceiver telemetryReceiver;
- private ClientMetricsReceiverPlugin clientMetricsReceiverPlugin;
-
- @BeforeEach
- public void setUp() {
- telemetryReceiver = new TestClientMetricsReceiver();
- clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin();
- }
-
- @Test
- public void testExportMetrics() throws UnknownHostException {
- assertTrue(clientMetricsReceiverPlugin.isEmpty());
-
- clientMetricsReceiverPlugin.add(telemetryReceiver);
- assertFalse(clientMetricsReceiverPlugin.isEmpty());
-
- assertEquals(0, telemetryReceiver.exportMetricsInvokedCount);
- assertTrue(telemetryReceiver.metricsData.isEmpty());
-
- byte[] metrics = "test-metrics".getBytes(StandardCharsets.UTF_8);
- clientMetricsReceiverPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
- new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build());
-
- assertEquals(1, telemetryReceiver.exportMetricsInvokedCount);
- assertEquals(1, telemetryReceiver.metricsData.size());
- assertEquals(metrics, telemetryReceiver.metricsData.get(0).array());
- }
-}
diff --git a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTelemetryPluginTest.java b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTelemetryPluginTest.java
new file mode 100644
index 0000000000000..958f21a12a256
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTelemetryPluginTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientMetricsReceiver;
+import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientTelemetryExporter;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ClientMetricsTelemetryPluginTest {
+
+ private TestClientMetricsReceiver telemetryReceiver;
+ private ClientTelemetryExporterPlugin clientTelemetryExporterPlugin;
+ private TestClientTelemetryExporter telemetryExporter;
+
+ @BeforeEach
+ public void setUp() {
+ telemetryReceiver = new TestClientMetricsReceiver();
+ telemetryExporter = new TestClientTelemetryExporter();
+ clientTelemetryExporterPlugin = new ClientTelemetryExporterPlugin();
+ }
+
+ @Test
+ public void testExportMetricsWithDeprecatedReceiver() throws UnknownHostException {
+ assertTrue(clientTelemetryExporterPlugin.isEmpty());
+
+ clientTelemetryExporterPlugin.add(telemetryReceiver);
+ assertFalse(clientTelemetryExporterPlugin.isEmpty());
+
+ assertEquals(0, telemetryReceiver.exportMetricsInvokedCount);
+ assertTrue(telemetryReceiver.metricsData.isEmpty());
+
+ byte[] metrics = "test-metrics".getBytes(StandardCharsets.UTF_8);
+ clientTelemetryExporterPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
+ new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 5000);
+
+ assertEquals(1, telemetryReceiver.exportMetricsInvokedCount);
+ assertEquals(1, telemetryReceiver.metricsData.size());
+ assertEquals(metrics, telemetryReceiver.metricsData.get(0).array());
+ }
+
+ @Test
+ public void testExportMetricsWithNewExporter() throws UnknownHostException {
+ assertTrue(clientTelemetryExporterPlugin.isEmpty());
+ clientTelemetryExporterPlugin.add(telemetryExporter);
+ assertFalse(clientTelemetryExporterPlugin.isEmpty());
+
+ assertEquals(0, telemetryExporter.exportMetricsInvokedCount);
+ assertTrue(telemetryExporter.metricsData.isEmpty());
+ assertTrue(telemetryExporter.pushIntervals.isEmpty());
+
+ byte[] metrics = "test-metrics-new".getBytes(StandardCharsets.UTF_8);
+ int pushIntervalMs = 10000;
+ clientTelemetryExporterPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
+ new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), pushIntervalMs);
+
+ assertEquals(1, telemetryExporter.exportMetricsInvokedCount);
+ assertEquals(1, telemetryExporter.metricsData.size());
+ assertEquals(ByteBuffer.wrap(metrics), telemetryExporter.metricsData.get(0));
+ assertEquals(1, telemetryExporter.pushIntervals.size());
+ assertEquals(pushIntervalMs, telemetryExporter.pushIntervals.get(0));
+ }
+
+ @Test
+ public void testExportMetricsWithBothReceiverAndExporter() throws UnknownHostException {
+ // Test with separate receiver and exporter objects - both should be called
+ clientTelemetryExporterPlugin.add(telemetryReceiver);
+ clientTelemetryExporterPlugin.add(telemetryExporter);
+ assertFalse(clientTelemetryExporterPlugin.isEmpty());
+
+ byte[] metrics = "test-metrics-both".getBytes(StandardCharsets.UTF_8);
+ int pushIntervalMs = 15000;
+ clientTelemetryExporterPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
+ new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), pushIntervalMs);
+
+ // Both should be called since they are separate objects
+ assertEquals(1, telemetryReceiver.exportMetricsInvokedCount);
+ assertEquals(1, telemetryExporter.exportMetricsInvokedCount);
+
+ assertEquals(ByteBuffer.wrap(metrics), telemetryReceiver.metricsData.get(0));
+ assertEquals(ByteBuffer.wrap(metrics), telemetryExporter.metricsData.get(0));
+ assertEquals(pushIntervalMs, telemetryExporter.pushIntervals.get(0));
+ }
+
+ @Test
+ public void testExportMetricsWithDualImplementation() throws UnknownHostException {
+ // Test with a class that implements both interfaces
+ // This mimics production behavior in DynamicBrokerConfig where pattern matching
+ // ensures only the exporter is added when a single object implements both interfaces
+ ClientMetricsTestUtils.TestDualImplementation dualImpl = new ClientMetricsTestUtils.TestDualImplementation();
+
+ // In production (DynamicBrokerConfig.scala), when a reporter implements both interfaces,
+ // only the exporter is added due to pattern matching that checks ClientTelemetryExporterProvider first
+ clientTelemetryExporterPlugin.add(dualImpl.clientTelemetryExporter());
+ assertFalse(clientTelemetryExporterPlugin.isEmpty());
+
+ byte[] metrics = "test-metrics-dual".getBytes(StandardCharsets.UTF_8);
+ int pushIntervalMs = 12000;
+ clientTelemetryExporterPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
+ new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), pushIntervalMs);
+
+ // Only the exporter should be called (receiver should not be invoked)
+ assertEquals(0, dualImpl.getReceiver().exportMetricsInvokedCount);
+ assertEquals(1, dualImpl.getExporter().exportMetricsInvokedCount);
+
+ assertEquals(ByteBuffer.wrap(metrics), dualImpl.getExporter().metricsData.get(0));
+ assertEquals(pushIntervalMs, dualImpl.getExporter().pushIntervals.get(0));
+ }
+}
diff --git a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
index 30c0e5f67b440..e2e8fa2a83ace 100644
--- a/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
+++ b/server/src/test/java/org/apache/kafka/server/metrics/ClientMetricsTestUtils.java
@@ -24,6 +24,10 @@
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
+import org.apache.kafka.server.telemetry.ClientTelemetry;
+import org.apache.kafka.server.telemetry.ClientTelemetryContext;
+import org.apache.kafka.server.telemetry.ClientTelemetryExporter;
+import org.apache.kafka.server.telemetry.ClientTelemetryExporterProvider;
import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
import org.apache.kafka.test.TestUtils;
@@ -94,6 +98,7 @@ public static RequestContext requestContextWithConnectionId(String connectionId)
false);
}
+ @SuppressWarnings("deprecation")
public static class TestClientMetricsReceiver implements ClientTelemetryReceiver {
public int exportMetricsInvokedCount = 0;
public List metricsData = new ArrayList<>();
@@ -103,4 +108,50 @@ public void exportMetrics(AuthorizableRequestContext context, ClientTelemetryPay
metricsData.add(payload.data());
}
}
+
+ public static class TestClientTelemetryExporter implements ClientTelemetryExporter {
+ public int exportMetricsInvokedCount = 0;
+ public List metricsData = new ArrayList<>();
+ public List pushIntervals = new ArrayList<>();
+
+ @Override
+ public void exportMetrics(ClientTelemetryContext context, ClientTelemetryPayload payload) {
+ exportMetricsInvokedCount += 1;
+ metricsData.add(payload.data());
+ pushIntervals.add(context.pushIntervalMs());
+ }
+ }
+
+ /**
+ * Test implementation that supports both deprecated and new interfaces.
+ * When both are implemented, only the new interface should be used.
+ */
+ @SuppressWarnings("deprecation")
+ public static class TestDualImplementation implements ClientTelemetry, ClientTelemetryExporterProvider {
+ private final TestClientMetricsReceiver receiver;
+ private final TestClientTelemetryExporter exporter;
+
+ public TestDualImplementation() {
+ this.receiver = new TestClientMetricsReceiver();
+ this.exporter = new TestClientTelemetryExporter();
+ }
+
+ @Override
+ public ClientTelemetryReceiver clientReceiver() {
+ return receiver;
+ }
+
+ @Override
+ public ClientTelemetryExporter clientTelemetryExporter() {
+ return exporter;
+ }
+
+ public TestClientMetricsReceiver getReceiver() {
+ return receiver;
+ }
+
+ public TestClientTelemetryExporter getExporter() {
+ return exporter;
+ }
+ }
}
diff --git a/server/src/test/java/org/apache/kafka/server/metrics/ClientTelemetryTest.java b/server/src/test/java/org/apache/kafka/server/metrics/ClientTelemetryTest.java
new file mode 100644
index 0000000000000..c6f5ff2556311
--- /dev/null
+++ b/server/src/test/java/org/apache/kafka/server/metrics/ClientTelemetryTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.metrics;
+
+import org.apache.kafka.common.message.PushTelemetryRequestData;
+import org.apache.kafka.common.requests.PushTelemetryRequest;
+import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientMetricsReceiver;
+import org.apache.kafka.server.metrics.ClientMetricsTestUtils.TestClientTelemetryExporter;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ClientTelemetryTest {
+
+ private ClientTelemetryExporterPlugin clientTelemetryExporterPlugin;
+
+ @BeforeEach
+ public void setUp() {
+ clientTelemetryExporterPlugin = new ClientTelemetryExporterPlugin();
+ }
+
+ @Test
+ public void testMultipleDeprecatedReceivers() throws UnknownHostException {
+ // Test that multiple deprecated receivers can be registered
+ TestClientMetricsReceiver receiver1 = new TestClientMetricsReceiver();
+ TestClientMetricsReceiver receiver2 = new TestClientMetricsReceiver();
+
+ clientTelemetryExporterPlugin.add(receiver1);
+ clientTelemetryExporterPlugin.add(receiver2);
+
+ byte[] metrics = "test-metrics-multiple".getBytes(StandardCharsets.UTF_8);
+ clientTelemetryExporterPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
+ new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), 5000);
+
+ // Verify both receivers were called
+ assertEquals(1, receiver1.exportMetricsInvokedCount);
+ assertEquals(1, receiver2.exportMetricsInvokedCount);
+ assertEquals(ByteBuffer.wrap(metrics), receiver1.metricsData.get(0));
+ assertEquals(ByteBuffer.wrap(metrics), receiver2.metricsData.get(0));
+ }
+
+ @Test
+ public void testMultipleNewExporters() throws UnknownHostException {
+ // Test that multiple new exporters can be registered
+ TestClientTelemetryExporter exporter1 = new TestClientTelemetryExporter();
+ TestClientTelemetryExporter exporter2 = new TestClientTelemetryExporter();
+
+ clientTelemetryExporterPlugin.add(exporter1);
+ clientTelemetryExporterPlugin.add(exporter2);
+
+ byte[] metrics = "test-metrics-multiple-new".getBytes(StandardCharsets.UTF_8);
+ int pushIntervalMs = 20000;
+ clientTelemetryExporterPlugin.exportMetrics(ClientMetricsTestUtils.requestContext(),
+ new PushTelemetryRequest.Builder(new PushTelemetryRequestData().setMetrics(ByteBuffer.wrap(metrics)), true).build(), pushIntervalMs);
+
+ // Verify both exporters were called
+ assertEquals(1, exporter1.exportMetricsInvokedCount);
+ assertEquals(1, exporter2.exportMetricsInvokedCount);
+ assertEquals(ByteBuffer.wrap(metrics), exporter1.metricsData.get(0));
+ assertEquals(ByteBuffer.wrap(metrics), exporter2.metricsData.get(0));
+ assertEquals(pushIntervalMs, exporter1.pushIntervals.get(0));
+ assertEquals(pushIntervalMs, exporter2.pushIntervals.get(0));
+ }
+}
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
index b479446389a9a..121b6d5217adc 100644
--- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
@@ -35,10 +35,8 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
-import org.apache.kafka.server.telemetry.ClientTelemetry;
-import org.apache.kafka.server.telemetry.ClientTelemetryPayload;
-import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
+import org.apache.kafka.server.telemetry.ClientTelemetryExporter;
+import org.apache.kafka.server.telemetry.ClientTelemetryExporterProvider;
import org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.MetricsData;
import org.apache.kafka.streams.ClientInstanceIds;
import org.apache.kafka.streams.KafkaClientSupplier;
@@ -134,7 +132,7 @@ static Stream recordingLevelParameters() {
@BeforeAll
public static void startCluster() throws IOException {
final Properties properties = new Properties();
- properties.put("metric.reporters", TelemetryPlugin.class.getName());
+ properties.put("metric.reporters", TelemetryPluginWithExporter.class.getName());
cluster = new EmbeddedKafkaCluster(NUM_BROKERS, properties);
cluster.start();
}
@@ -192,7 +190,7 @@ public void shouldPushGlobalThreadMetricsToBroker(final String recordingLevel, f
assertNotNull(globalStoreConsumerInstanceId);
LOG.info("Global consumer instance id {}", globalStoreConsumerInstanceId);
TestUtils.waitForCondition(
- () -> !TelemetryPlugin.SUBSCRIBED_METRICS.getOrDefault(globalStoreConsumerInstanceId, Collections.emptyList()).isEmpty(),
+ () -> !TelemetryPluginWithExporter.SUBSCRIBED_METRICS.getOrDefault(globalStoreConsumerInstanceId, Collections.emptyList()).isEmpty(),
30_000,
"Never received subscribed metrics"
);
@@ -205,7 +203,7 @@ public void shouldPushGlobalThreadMetricsToBroker(final String recordingLevel, f
return "org.apache.kafka." + group + "." + name;
}).filter(name -> !name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter filters out string metrics
.sorted().toList();
- final List actualGlobalMetrics = new ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(globalStoreConsumerInstanceId));
+ final List actualGlobalMetrics = new ArrayList<>(TelemetryPluginWithExporter.SUBSCRIBED_METRICS.get(globalStoreConsumerInstanceId));
assertEquals(expectedGlobalMetrics, actualGlobalMetrics);
}
}
@@ -222,7 +220,7 @@ public void shouldPushMetricsToBroker(final String recordingLevel, final String
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final ClientInstanceIds clientInstanceIds = streams.clientInstanceIds(Duration.ofSeconds(60));
final Uuid adminInstanceId = clientInstanceIds.adminInstanceId();
-
+
final Uuid mainConsumerInstanceId = clientInstanceIds.consumerInstanceIds().entrySet().stream()
.filter(entry -> !entry.getKey().endsWith("-restore-consumer")
&& !entry.getKey().endsWith("GlobalStreamThread-global-consumer"))
@@ -238,7 +236,7 @@ public void shouldPushMetricsToBroker(final String recordingLevel, final String
.findFirst().orElseThrow();
TestUtils.waitForCondition(
- () -> !TelemetryPlugin.SUBSCRIBED_METRICS.getOrDefault(mainConsumerInstanceId, Collections.emptyList()).isEmpty(),
+ () -> !TelemetryPluginWithExporter.SUBSCRIBED_METRICS.getOrDefault(mainConsumerInstanceId, Collections.emptyList()).isEmpty(),
30_000,
"Never received subscribed metrics"
);
@@ -249,28 +247,28 @@ public void shouldPushMetricsToBroker(final String recordingLevel, final String
return "org.apache.kafka." + group + "." + name;
}).filter(name -> !name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter filters out string metrics
.sorted().toList();
- final List actualMetrics = new ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
+ final List actualMetrics = new ArrayList<>(TelemetryPluginWithExporter.SUBSCRIBED_METRICS.get(mainConsumerInstanceId));
assertEquals(expectedMetrics, actualMetrics);
TestUtils.waitForCondition(
- () -> !TelemetryPlugin.SUBSCRIBED_METRICS.getOrDefault(adminInstanceId, Collections.emptyList()).isEmpty(),
+ () -> !TelemetryPluginWithExporter.SUBSCRIBED_METRICS.getOrDefault(adminInstanceId, Collections.emptyList()).isEmpty(),
30_000,
"Never received subscribed metrics"
);
- final List actualInstanceMetrics = TelemetryPlugin.SUBSCRIBED_METRICS.get(adminInstanceId);
+ final List actualInstanceMetrics = TelemetryPluginWithExporter.SUBSCRIBED_METRICS.get(adminInstanceId);
final List expectedInstanceMetrics = Arrays.asList(
"org.apache.kafka.stream.alive.stream.threads",
"org.apache.kafka.stream.client.state",
"org.apache.kafka.stream.failed.stream.threads",
"org.apache.kafka.stream.recording.level");
-
+
assertEquals(expectedInstanceMetrics, actualInstanceMetrics);
- TestUtils.waitForCondition(() -> TelemetryPlugin.processId != null,
+ TestUtils.waitForCondition(() -> TelemetryPluginWithExporter.processId != null,
30_000,
"Never received the process id");
- assertEquals(expectedProcessId, TelemetryPlugin.processId);
+ assertEquals(expectedProcessId, TelemetryPluginWithExporter.processId);
}
}
@@ -359,7 +357,7 @@ public void shouldPassCorrectMetricsDynamicInstances() throws Exception {
.filter(metricName -> metricName.tags().containsKey("task-id")).toList();
final List streamsOneStateMetrics = streamsOne.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.group().equals("stream-state-metrics")).toList();
-
+
final List consumerOnePassedTaskMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
.passedMetrics().stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).toList();
final List consumerOnePassedStateMetrics = INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
@@ -369,7 +367,7 @@ public void shouldPassCorrectMetricsDynamicInstances() throws Exception {
.filter(metricName -> metricName.tags().containsKey("task-id")).toList();
final List streamsTwoStateMetrics = streamsTwo.metrics().values().stream().map(Metric::metricName)
.filter(metricName -> metricName.group().equals("stream-state-metrics")).toList();
-
+
final List consumerTwoPassedTaskMetrics = INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
.passedMetrics().stream().map(KafkaMetric::metricName).filter(metricName -> metricName.tags().containsKey("task-id")).toList();
final List consumerTwoPassedStateMetrics = INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
@@ -635,11 +633,12 @@ public List passedMetrics() {
}
}
- public static class TelemetryPlugin implements ClientTelemetry, MetricsReporter, ClientTelemetryReceiver {
+ public static class TelemetryPluginWithExporter implements ClientTelemetryExporterProvider, MetricsReporter {
public static final Map> SUBSCRIBED_METRICS = new ConcurrentHashMap<>();
public static String processId;
- public TelemetryPlugin() {
+
+ public TelemetryPluginWithExporter() {
}
@Override
@@ -656,50 +655,45 @@ public void metricRemoval(final KafkaMetric metric) {
@Override
public void close() {
-
}
@Override
public void configure(final Map configs) {
-
- }
-
- @Override
- public ClientTelemetryReceiver clientReceiver() {
- return this;
}
@Override
- public void exportMetrics(final AuthorizableRequestContext context, final ClientTelemetryPayload payload) {
- try {
- final MetricsData data = MetricsData.parseFrom(payload.data());
-
- final Optional processIdOption = data.getResourceMetricsList()
- .stream()
- .flatMap(rm -> rm.getScopeMetricsList().stream())
- .flatMap(sm -> sm.getMetricsList().stream())
- .map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getGauge)
- .flatMap(gauge -> gauge.getDataPointsList().stream())
- .flatMap(numberDataPoint -> numberDataPoint.getAttributesList().stream())
- .filter(keyValue -> keyValue.getKey().equals("process_id"))
- .map(keyValue -> keyValue.getValue().getStringValue())
- .findFirst();
-
- processIdOption.ifPresent(pid -> processId = pid);
-
- final Uuid clientId = payload.clientInstanceId();
- final List metricNames = data.getResourceMetricsList()
- .stream()
- .flatMap(rm -> rm.getScopeMetricsList().stream())
- .flatMap(sm -> sm.getMetricsList().stream())
- .map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getName)
- .sorted()
- .toList();
- LOG.info("Found metrics {} for clientId={}", metricNames, clientId);
- SUBSCRIBED_METRICS.put(clientId, metricNames);
- } catch (final Exception e) {
- e.printStackTrace(System.err);
- }
+ public ClientTelemetryExporter clientTelemetryExporter() {
+ return (context, payload) -> {
+ try {
+ final MetricsData data = MetricsData.parseFrom(payload.data());
+
+ final Optional processIdOption = data.getResourceMetricsList()
+ .stream()
+ .flatMap(rm -> rm.getScopeMetricsList().stream())
+ .flatMap(sm -> sm.getMetricsList().stream())
+ .map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getGauge)
+ .flatMap(gauge -> gauge.getDataPointsList().stream())
+ .flatMap(numberDataPoint -> numberDataPoint.getAttributesList().stream())
+ .filter(keyValue -> keyValue.getKey().equals("process_id"))
+ .map(keyValue -> keyValue.getValue().getStringValue())
+ .findFirst();
+
+ processIdOption.ifPresent(pid -> processId = pid);
+
+ final Uuid clientId = payload.clientInstanceId();
+ final List metricNames = data.getResourceMetricsList()
+ .stream()
+ .flatMap(rm -> rm.getScopeMetricsList().stream())
+ .flatMap(sm -> sm.getMetricsList().stream())
+ .map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getName)
+ .sorted()
+ .toList();
+ LOG.info("Found metrics {} for clientId={} with pushIntervalMs={}", metricNames, clientId, context.pushIntervalMs());
+ SUBSCRIBED_METRICS.put(clientId, metricNames);
+ } catch (final Exception e) {
+ e.printStackTrace(System.err);
+ }
+ };
}
}
}