Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,8 @@
<allow pkg="org.apache.kafka.server.config" />
<allow class="org.apache.kafka.server.authorizer.AuthorizableRequestContext" />
<allow class="org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.MetricsData"/>
<allow class="org.apache.kafka.server.telemetry.ClientTelemetry" />
<allow class="org.apache.kafka.server.telemetry.ClientTelemetryPayload" />
<allow class="org.apache.kafka.server.telemetry.ClientTelemetryReceiver" />
<allow class="org.apache.kafka.server.telemetry.ClientTelemetryExporter" />
<allow class="org.apache.kafka.server.telemetry.ClientTelemetryExporterProvider" />
<allow class="org.apache.kafka.storage.internals.log.CleanerConfig" />
<allow class="org.apache.kafka.coordinator.transaction.TransactionLogConfig" />
<allow pkg="org.apache.kafka.coordinator.group" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.telemetry.ClientTelemetry;
import org.apache.kafka.server.telemetry.ClientTelemetryReceiver;
import org.apache.kafka.server.telemetry.ClientTelemetryExporter;
import org.apache.kafka.server.telemetry.ClientTelemetryExporterProvider;

import java.time.Duration;
import java.util.ArrayList;
Expand All @@ -67,10 +67,10 @@
public class ClientTelemetryTest {

@ClusterTest(
types = Type.KRAFT,
types = Type.KRAFT,
brokers = 3,
serverProperties = {
@ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "org.apache.kafka.clients.admin.ClientTelemetryTest$GetIdClientTelemetry"),
@ClusterConfigProperty(key = METRIC_REPORTER_CLASSES_CONFIG, value = "org.apache.kafka.clients.admin.ClientTelemetryTest$TelemetryExporter"),
})
public void testClientInstanceId(ClusterInstance clusterInstance) throws InterruptedException, ExecutionException {
Map<String, Object> configs = new HashMap<>();
Expand Down Expand Up @@ -156,14 +156,8 @@ private static String[] toArray(Collection<List<String>> lists) {
return lists.stream().flatMap(List::stream).toArray(String[]::new);
}

/**
* We should add a ClientTelemetry into plugins to test the clientInstanceId method Otherwise the
* {@link org.apache.kafka.common.protocol.ApiKeys#GET_TELEMETRY_SUBSCRIPTIONS} command will not be supported
* by the server
**/
@SuppressWarnings("unused")
public static class GetIdClientTelemetry implements ClientTelemetry, MetricsReporter {

public static class TelemetryExporter implements ClientTelemetryExporterProvider, MetricsReporter {

@Override
public void init(List<KafkaMetric> metrics) {
Expand All @@ -186,7 +180,7 @@ public void configure(Map<String, ?> configs) {
}

@Override
public ClientTelemetryReceiver clientReceiver() {
public ClientTelemetryExporter clientTelemetryExporter() {
return (context, payload) -> {
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@
/**
* A {@link MetricsReporter} may implement this interface to indicate support for collecting client
* telemetry on the server side.
*
* @deprecated Since 4.2.0, use {@link ClientTelemetryExporterProvider} instead. This interface will be
* removed in Kafka 5.0.0. The new interface provides a {@link ClientTelemetryExporter}
* which includes additional context such as the push interval.
*/
@Deprecated(since = "4.2", forRemoval = true)
@SuppressWarnings("removal")
public interface ClientTelemetry {

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.server.telemetry;

import org.apache.kafka.server.authorizer.AuthorizableRequestContext;

/**
* {@code ClientTelemetryContext} provides context information for client telemetry requests,
* including the push interval.
*/
public interface ClientTelemetryContext {

/**
* Returns the interval at which the client pushes telemetry metrics to the broker.
* This is the interval from the subscription.
* <p>
* Note that for the initial metric push and pushes following a subscription update
* or error, a jitter (between 0.5x and 1.5x of this interval) is applied to avoid
* multiple clients sending requests simultaneously.
* <p>
* This value can be used by metrics exporters to determine when metrics should be
* considered stale or expired.
*
* @return the push interval in milliseconds from the subscription
*/
int pushIntervalMs();

/**
* Returns the authorization context for the client request.
*
* @return the client request context for the corresponding {@code PushTelemetryRequest} API call
*/
AuthorizableRequestContext authorizableRequestContext();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.server.telemetry;

/**
* {@code ClientTelemetryExporter} defines the behavior for telemetry exporters on the broker side
* which receive and export client telemetry metrics and provides additional context including the
* push interval.
*/
public interface ClientTelemetryExporter {

/**
* Called by the broker when a client reports telemetry metrics. The telemetry context
* includes the push interval and authorization details which can be used by the metrics
* exporter to manage metric lifecycle and retrieval of additional client information.
* <p>
* This method may be called from the request handling thread, and as such should avoid blocking.
*
* @param context the client telemetry context including push interval and request authorization context
* @param payload the encoded telemetry payload as sent by the client
*/
void exportMetrics(ClientTelemetryContext context, ClientTelemetryPayload payload);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.server.telemetry;

import org.apache.kafka.common.metrics.MetricsReporter;

/**
* A {@link MetricsReporter} may implement this interface to indicate support for collecting client
* telemetry on the server side using the new exporter API.
*/
public interface ClientTelemetryExporterProvider {

/**
* Called by the broker to fetch instance of {@link ClientTelemetryExporter}.
* <p>
* This instance may be cached by the broker.
*
* @return broker side instance of {@link ClientTelemetryExporter}
*/
ClientTelemetryExporter clientTelemetryExporter();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import java.nio.ByteBuffer;

/**
* A client telemetry payload as sent by the client to the telemetry receiver. The payload is
* received by the broker's {@link ClientTelemetryReceiver} implementation.
* A client telemetry payload as sent by the client to the telemetry exporter. The payload is
* exported using a {@link ClientTelemetryExporter}.
*/
public interface ClientTelemetryPayload {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
/**
* {@code ClientTelemetryReceiver} defines the behaviour for telemetry receiver on the broker side
* which receives client telemetry metrics.
*
* @deprecated Since 4.2.0, use {@link ClientTelemetryExporter} instead. This interface will be
* removed in Kafka 5.0.0. The new interface provides additional context including
* the push interval to enable better metric lifecycle management.
*/
@Deprecated(since = "4.2", forRemoval = true)
public interface ClientTelemetryReceiver {
/**
* Called by the broker when a client reports telemetry metrics. The associated request context
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.config.{ConfigType, DelegationTokenManagerConfigs}
import org.apache.kafka.server.log.remote.storage.{RemoteLogManager, RemoteLogManagerConfig}
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, KafkaYammerMetrics}
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, KafkaYammerMetrics}
import org.apache.kafka.server.network.{EndpointReadyFutures, KafkaAuthorizerServerInfo}
import org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpStatePersister, Persister, PersisterStateManager}
import org.apache.kafka.server.share.session.ShareSessionCache
Expand Down Expand Up @@ -188,9 +188,9 @@ class BrokerServer(

info("Starting broker")

val clientMetricsReceiverPlugin = new ClientMetricsReceiverPlugin()
val clientTelemetryExporterPlugin = new ClientTelemetryExporterPlugin()

config.dynamicConfig.initialize(Some(clientMetricsReceiverPlugin))
config.dynamicConfig.initialize(Some(clientTelemetryExporterPlugin))
quotaManagers = QuotaFactory.instantiate(config, metrics, time, s"broker-${config.nodeId}-", ProcessRole.BrokerRole.toString)
DynamicBrokerConfig.readDynamicBrokerConfigsFromSnapshot(raftManager, config, quotaManagers, logContext)

Expand Down Expand Up @@ -243,7 +243,7 @@ class BrokerServer(
)
clientToControllerChannelManager.start()
forwardingManager = new ForwardingManagerImpl(clientToControllerChannelManager, metrics)
clientMetricsManager = new ClientMetricsManager(clientMetricsReceiverPlugin, config.clientTelemetryMaxBytes, time, metrics)
clientMetricsManager = new ClientMetricsManager(clientTelemetryExporterPlugin, config.clientTelemetryMaxBytes, time, metrics)

val apiVersionManager = new DefaultApiVersionManager(
ListenerType.BROKER,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class ControllerServer(
try {
this.logIdent = logContext.logPrefix()
info("Starting controller")
config.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None)
config.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None)

maybeChangeStatus(STARTING, STARTED)

Expand Down
40 changes: 22 additions & 18 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
import org.apache.kafka.server.common.ApiMessageAndVersion
import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, MetricConfigs}
import org.apache.kafka.server.telemetry.ClientTelemetry
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, MetricConfigs}
import org.apache.kafka.server.telemetry.{ClientTelemetry, ClientTelemetryExporterProvider}
import org.apache.kafka.snapshot.RecordsSnapshotReader
import org.apache.kafka.storage.internals.log.{LogCleaner, LogConfig}

Expand Down Expand Up @@ -259,12 +259,12 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private[server] val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var metricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin] = _
private var telemetryExporterPluginOpt: Option[ClientTelemetryExporterPlugin] = _
private var currentConfig: KafkaConfig = _

private[server] def initialize(clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
private[server] def initialize(clientTelemetryExporterPluginOpt: Option[ClientTelemetryExporterPlugin]): Unit = {
currentConfig = new KafkaConfig(kafkaConfig.props, false)
metricsReceiverPluginOpt = clientMetricsReceiverPluginOpt
telemetryExporterPluginOpt = clientTelemetryExporterPluginOpt
}

/**
Expand Down Expand Up @@ -374,8 +374,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
dynamicDefaultConfigs.clone()
}

private[server] def clientMetricsReceiverPlugin: Option[ClientMetricsReceiverPlugin] = CoreUtils.inReadLock(lock) {
metricsReceiverPluginOpt
private[server] def clientTelemetryExporterPlugin: Option[ClientTelemetryExporterPlugin] = CoreUtils.inReadLock(lock) {
telemetryExporterPluginOpt
}

private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) {
Expand Down Expand Up @@ -841,18 +841,22 @@ class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Me
reporters.forEach { reporter =>
metrics.addReporter(reporter)
currentReporters += reporter.getClass.getName -> reporter
val clientTelemetryReceiver = reporter match {
case telemetry: ClientTelemetry => telemetry.clientReceiver()
case _ => null
}

if (clientTelemetryReceiver != null) {
dynamicConfig.clientMetricsReceiverPlugin match {
case Some(receiverPlugin) =>
receiverPlugin.add(clientTelemetryReceiver)
case None =>
// Do nothing
}
// Support both deprecated ClientTelemetry and new ClientTelemetryExporterProvider interfaces
// If a class implements both, only use the new (i.e., ClientTelemetryExporterProvider interface)
dynamicConfig.clientTelemetryExporterPlugin match {
case Some(telemetryExporterPlugin) =>
reporter match {
case exporterProvider: ClientTelemetryExporterProvider =>
// Use new interface (i.e., takes precedence even if class also implements deprecated interface)
telemetryExporterPlugin.add(exporterProvider.clientTelemetryExporter())
case telemetry: ClientTelemetry =>
telemetryExporterPlugin.add(telemetry.clientReceiver())
case _ =>
// Reporter doesn't support client telemetry
}
case None =>
// Do nothing
}
}
KafkaBroker.notifyClusterListeners(clusterId, reporters.asScala)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/SharedServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class SharedServer(
// This is only done in tests.
metrics = new Metrics()
}
sharedServerConfig.dynamicConfig.initialize(clientMetricsReceiverPluginOpt = None)
sharedServerConfig.dynamicConfig.initialize(clientTelemetryExporterPluginOpt = None)

if (sharedServerConfig.processRoles.contains(ProcessRole.BrokerRole)) {
brokerMetrics = new BrokerServerMetrics(metrics)
Expand Down
Loading
Loading