Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.linkedin.d2.balancer.zkfs.ZKFSTogglingLoadBalancerFactoryImpl;
import com.linkedin.d2.balancer.zkfs.ZKFSUtil;
import com.linkedin.d2.discovery.event.ServiceDiscoveryEventEmitter;
import com.linkedin.d2.jmx.LoadBalancerStateOtelMetricsProvider;
import com.linkedin.d2.discovery.stores.zk.ZKPersistentConnection;
import com.linkedin.d2.discovery.stores.zk.ZooKeeper;
import com.linkedin.d2.jmx.XdsServerMetricsProvider;
Expand Down Expand Up @@ -237,6 +238,7 @@ public D2Client build()
_config.xdsChannelLoadBalancingPolicyConfig,
_config.subscribeToUriGlobCollection,
_config._xdsServerMetricsProvider,
_config._loadBalancerStateOtelMetricsProvider,
_config.loadBalanceStreamException,
_config.xdsInitialResourceVersionsEnabled,
_config.disableDetectLiRawD2Client,
Expand Down Expand Up @@ -339,6 +341,12 @@ else if (_config.restRetryEnabled || _config.streamRetryEnabled)
return d2Client;
}

public D2ClientBuilder setLoadBalancerStateOtelMetricsProvider(LoadBalancerStateOtelMetricsProvider provider)
{
_config._loadBalancerStateOtelMetricsProvider = provider;
return this;
}

/**
* Check if the d2 client builder is to build a LI raw D2 client. When LI container D2ClientFactory is used, it sets
* hostName and d2JmxManagerPrefix with LI-specific values (app name, machine name, etc) at runtime. All LI raw D2
Expand Down
9 changes: 7 additions & 2 deletions d2/src/main/java/com/linkedin/d2/balancer/D2ClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import com.linkedin.d2.jmx.JmxManager;
import com.linkedin.d2.jmx.NoOpXdsServerMetricsProvider;
import com.linkedin.d2.jmx.NoOpJmxManager;
import com.linkedin.d2.jmx.LoadBalancerStateOtelMetricsProvider;
import com.linkedin.d2.jmx.NoOpLoadBalancerStateOtelMetricsProvider;
import com.linkedin.r2.transport.common.TransportClientFactory;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import java.time.Duration;
Expand Down Expand Up @@ -181,6 +183,7 @@ public class D2ClientConfig

public boolean subscribeToUriGlobCollection = false;
public XdsServerMetricsProvider _xdsServerMetricsProvider = new NoOpXdsServerMetricsProvider();
public LoadBalancerStateOtelMetricsProvider _loadBalancerStateOtelMetricsProvider = new NoOpLoadBalancerStateOtelMetricsProvider();
public boolean loadBalanceStreamException = false;
public boolean xdsInitialResourceVersionsEnabled = false;
public Integer xdsStreamMaxRetryBackoffSeconds = null;
Expand Down Expand Up @@ -282,6 +285,7 @@ public D2ClientConfig()
Map<String, ?> xdsChannelLoadBalancingPolicyConfig,
boolean subscribeToUriGlobCollection,
XdsServerMetricsProvider xdsServerMetricsProvider,
LoadBalancerStateOtelMetricsProvider loadBalancerStateOtelMetricsProvider,
boolean loadBalanceStreamException,
boolean xdsInitialResourceVersionsEnabled,
boolean disableDetectLiRawD2Client,
Expand All @@ -294,7 +298,7 @@ public D2ClientConfig()
Boolean enableIndisDownstreamServicesFetcher,
Duration indisDownstreamServicesFetchTimeout)
{
this.zkHosts = zkHosts;
this.zkHosts = zkHosts;
this.xdsServer = xdsServer;
this.hostName = hostName;
this.zkSessionTimeoutInMs = zkSessionTimeoutInMs;
Expand Down Expand Up @@ -366,7 +370,8 @@ public D2ClientConfig()
this.xdsChannelLoadBalancingPolicyConfig = xdsChannelLoadBalancingPolicyConfig;
this.xdsChannelKeepAliveTimeMins = xdsChannelKeepAliveTimeMins;
this.subscribeToUriGlobCollection = subscribeToUriGlobCollection;
this._xdsServerMetricsProvider = xdsServerMetricsProvider;
this._xdsServerMetricsProvider = xdsServerMetricsProvider;
this._loadBalancerStateOtelMetricsProvider = loadBalancerStateOtelMetricsProvider == null ? new NoOpLoadBalancerStateOtelMetricsProvider() : loadBalancerStateOtelMetricsProvider;
this.loadBalanceStreamException = loadBalanceStreamException;
this.xdsInitialResourceVersionsEnabled = xdsInitialResourceVersionsEnabled;
this.disableDetectLiRawD2Client = disableDetectLiRawD2Client;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public LoadBalancerWithFacilities create(D2ClientConfig config)
logAppProps(LOG);
}

D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager,
D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager);
D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager,
D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager, config._loadBalancerStateOtelMetricsProvider);

// init connection
ZKConnectionBuilder zkConnectionBuilder = new ZKConnectionBuilder(config.zkHosts);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ private ZKFSLoadBalancer.TogglingLoadBalancerFactory createLoadBalancerFactory(D
loadBalancerComponentFactory = config.componentFactory;
}

D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager,
D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager);
D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager,
D2ClientJmxManager.DiscoverySourceType.ZK, config.dualReadStateManager, config._loadBalancerStateOtelMetricsProvider);

return new ZKFSTogglingLoadBalancerFactoryImpl(loadBalancerComponentFactory,
config.lbWaitTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public ZKFSTogglingLoadBalancerFactoryImpl(ComponentFactory factory,
new PartitionAccessorRegistryImpl(),
false,
validationStrings -> null,
new D2ClientJmxManager("notSpecified", new NoOpJmxManager()),
new D2ClientJmxManager("notSpecified", new NoOpJmxManager(), D2ClientJmxManager.DiscoverySourceType.ZK, null, null),
ZooKeeperEphemeralStore.DEFAULT_READ_WINDOW_MS);
}

Expand Down
19 changes: 15 additions & 4 deletions d2/src/main/java/com/linkedin/d2/jmx/D2ClientJmxManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState;
import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState.SimpleLoadBalancerStateListener;
import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy;
import com.linkedin.d2.jmx.LoadBalancerStateOtelMetricsProvider;
import com.linkedin.d2.jmx.NoOpLoadBalancerStateOtelMetricsProvider;
import com.linkedin.d2.discovery.stores.file.FileStore;
import com.linkedin.d2.discovery.stores.zk.ZooKeeperEphemeralStore;
import com.linkedin.d2.discovery.stores.zk.ZooKeeperPermanentStore;
Expand Down Expand Up @@ -72,6 +74,7 @@ When dual read state manager is null, only one discovery source is working (coul
private final String _secondaryPrefixForLbPropertyJmxName;

private final D2ClientJmxDualReadModeWatcherManager _watcherManager;
private final LoadBalancerStateOtelMetricsProvider _loadBalancerStateOtelMetricsProvider;


public enum DiscoverySourceType
Expand All @@ -94,19 +97,21 @@ public String getPrintName()

public D2ClientJmxManager(String prefix, @Nonnull JmxManager jmxManager)
{
this(prefix, jmxManager, DiscoverySourceType.ZK, null);
this(prefix, jmxManager, DiscoverySourceType.ZK, null, null);
}

public D2ClientJmxManager(String prefix,
@Nonnull JmxManager jmxManager,
@Nonnull DiscoverySourceType discoverySourceType,
@Nullable DualReadStateManager dualReadStateManager)
@Nullable DualReadStateManager dualReadStateManager,
@Nullable LoadBalancerStateOtelMetricsProvider loadBalancerStateOtelMetricsProvider)
{
ArgumentUtil.ensureNotNull(jmxManager,"jmxManager");
_primaryGlobalPrefix = prefix;
_jmxManager = jmxManager;
_discoverySourceType = discoverySourceType;
_dualReadStateManager = dualReadStateManager;
_loadBalancerStateOtelMetricsProvider = loadBalancerStateOtelMetricsProvider == null ? new NoOpLoadBalancerStateOtelMetricsProvider() : loadBalancerStateOtelMetricsProvider;
_secondaryGlobalPrefix = String.format("%s-%s", _primaryGlobalPrefix, _discoverySourceType.getPrintName());
_secondaryPrefixForLbPropertyJmxName = String.format("%s-", _discoverySourceType.getPrintName());
_watcherManager = _dualReadStateManager == null ? new NoOpD2ClientJmxDualReadModeWatcherManagerImpl()
Expand Down Expand Up @@ -317,8 +322,14 @@ private void doRegisterLoadBalancer(SimpleLoadBalancer balancer, @Nullable DualR

private void doRegisterLoadBalancerState(SimpleLoadBalancerState state, @Nullable DualReadModeProvider.DualReadMode mode)
{
final String jmxName = String.format("%s-LoadBalancerState", getGlobalPrefix(mode));
_jmxManager.registerLoadBalancerState(jmxName, state);
final String jmxName = String.format("%s-LoadBalancerState", getGlobalPrefix(mode));
// First call the existing API that tests expect (registerLoadBalancerState with the state)
_jmxManager.registerLoadBalancerState(jmxName, state);
// Then create the JMX bean with OTel provider and set client name before replacing the registration
SimpleLoadBalancerStateJmx bean = new SimpleLoadBalancerStateJmx(state, _loadBalancerStateOtelMetricsProvider);
String clientName = getGlobalPrefix(mode);
bean.setClientName(clientName);
_jmxManager.checkReg(bean, jmxName);
}

private <T> void doRegisterUriFileStore(FileStore<T> uriStore, @Nullable DualReadModeProvider.DualReadMode mode)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.linkedin.d2.jmx;

/**
* Interface for OpenTelemetry metrics collection for LoadBalancerState sensor.
*/
public interface LoadBalancerStateOtelMetricsProvider {

/**
* Records both regular and symlink cluster counts for a client.
*
* @param clientName the client name
* @param regularClusterCount regular cluster count
* @param symlinkClusterCount symlink cluster count
*/
void recordClusterCount(String clientName, long regularClusterCount, long symlinkClusterCount);

/**
* Records service count for a client.
*
* @param clientName the client name
* @param serviceCount service count
*/
void recordServiceCount(String clientName, long serviceCount);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.linkedin.d2.jmx;

/**
* No-Op implementation of {@link LoadBalancerStateOtelMetricsProvider}.
* Used when OpenTelemetry metrics are disabled.
*/
public class NoOpLoadBalancerStateOtelMetricsProvider implements LoadBalancerStateOtelMetricsProvider {

@Override
public void recordClusterCount(String clientName, long regularClusterCount, long symlinkClusterCount) {
// no-op
}

@Override
public void recordServiceCount(String clientName, long serviceCount) {
// no-op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,46 @@
public class SimpleLoadBalancerStateJmx implements SimpleLoadBalancerStateJmxMBean
{
private final SimpleLoadBalancerState _state;
private final LoadBalancerStateOtelMetricsProvider _otelMetricsProvider;
private String _clientName = "-";

@Deprecated
public SimpleLoadBalancerStateJmx(SimpleLoadBalancerState state)
{
this(state, new NoOpLoadBalancerStateOtelMetricsProvider());
}

public SimpleLoadBalancerStateJmx(SimpleLoadBalancerState state, LoadBalancerStateOtelMetricsProvider otelMetricsProvider)
{
_state = state;
_otelMetricsProvider = otelMetricsProvider == null ? new NoOpLoadBalancerStateOtelMetricsProvider() : otelMetricsProvider;
}

public void setClientName(String clientName)
{
_clientName = clientName == null ? "-" : clientName;
}

public String getClientName()
{
return _clientName;
}

@Override
public int getClusterCount()
{
return _state.getClusterCount();
int regular = _state.getClusterCount();
long symlink = _state.getClusters().stream().filter(SymlinkUtil::isSymlinkNodeOrPath).count();
_otelMetricsProvider.recordClusterCount(_clientName, (long) regular, symlink);
return regular;
}

@Override
public long getSymlinkClusterCount()
{
return _state.getClusters().stream().filter(SymlinkUtil::isSymlinkNodeOrPath).count();
long symlink = _state.getClusters().stream().filter(SymlinkUtil::isSymlinkNodeOrPath).count();
_otelMetricsProvider.recordClusterCount(_clientName, (long) _state.getClusterCount(), symlink);
return symlink;
}

@Override
Expand All @@ -63,7 +87,9 @@ public int getListenerCount()
@Override
public int getServiceCount()
{
return _state.getServiceCount();
int count = _state.getServiceCount();
_otelMetricsProvider.recordServiceCount(_clientName, (long) count);
return count;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public boolean isIndisOnly()
public LoadBalancerWithFacilities create(D2ClientConfig config)
{
D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(config.d2JmxManagerPrefix, config.jmxManager,
D2ClientJmxManager.DiscoverySourceType.XDS, config.dualReadStateManager);
D2ClientJmxManager.DiscoverySourceType.XDS, config.dualReadStateManager, config._loadBalancerStateOtelMetricsProvider);

if (config.dualReadStateManager != null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,11 @@ D2ClientJmxManager getD2ClientJmxManager(String prefix, D2ClientJmxManager.Disco
{
if (sourceType == null)
{ // default to ZK source type, null dualReadStateManager
_d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager);
_d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager, D2ClientJmxManager.DiscoverySourceType.ZK, null, null);
}
else
{
_d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager, sourceType, isDualReadLB ? _dualReadStateManager : null);
_d2ClientJmxManager = new D2ClientJmxManager(prefix, _jmxManager, sourceType, isDualReadLB ? _dualReadStateManager : null, null);
}
return _d2ClientJmxManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private D2Client getD2Client(LoadBalancerWithFacilitiesFactory lbWithFacilitiesF
private void testD2ClientJmxManagerRegisteringStrategies()
{
JmxManager mockJmxManager = mock(JmxManager.class);
D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(DUMMY_STRING, mockJmxManager);
D2ClientJmxManager d2ClientJmxManager = new D2ClientJmxManager(DUMMY_STRING, mockJmxManager, D2ClientJmxManager.DiscoverySourceType.ZK, null, null);

SimpleLoadBalancerState simpleLoadBalancerState = mock(SimpleLoadBalancerState.class);
d2ClientJmxManager.setSimpleLoadBalancerState(simpleLoadBalancerState);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.linkedin.d2.jmx;

import static org.mockito.Mockito.*;
import static org.testng.Assert.*;

import com.linkedin.d2.balancer.simple.SimpleLoadBalancerState;
import java.util.Arrays;
import java.util.HashSet;
import org.testng.annotations.Test;

/**
* Unit tests for {@link SimpleLoadBalancerStateJmx}.
*/
public class TestSimpleLoadBalancerStateJmx
{
@Test
public void testDefaultNoOpProviderDoesNotThrow()
{
SimpleLoadBalancerState state = mock(SimpleLoadBalancerState.class);
when(state.getClusterCount()).thenReturn(3);
when(state.getClusters()).thenReturn(new HashSet<>(Arrays.asList("a","$b")));
when(state.getServiceCount()).thenReturn(5);

SimpleLoadBalancerStateJmx jmx = new SimpleLoadBalancerStateJmx(state);

assertEquals(jmx.getClusterCount(), 3);
assertEquals(jmx.getSymlinkClusterCount(), 1L);
assertEquals(jmx.getServiceCount(), 5);
}

@Test
public void testWithMockProviderReceivesCallbacksAndClientName()
{
SimpleLoadBalancerState state = mock(SimpleLoadBalancerState.class);
when(state.getClusterCount()).thenReturn(2);
when(state.getClusters()).thenReturn(new HashSet<>(Arrays.asList("c1","$c2")));
when(state.getServiceCount()).thenReturn(7);

LoadBalancerStateOtelMetricsProvider provider = mock(LoadBalancerStateOtelMetricsProvider.class);

SimpleLoadBalancerStateJmx jmx = new SimpleLoadBalancerStateJmx(state, provider);

// default client name is "-"
jmx.getClusterCount();
jmx.getSymlinkClusterCount();
jmx.getServiceCount();

// cluster count method is called twice (both getters call recordClusterCount)
verify(provider, times(2)).recordClusterCount("-", 2L, 1L);
verify(provider, times(1)).recordServiceCount("-", 7L);

// set client name and verify subsequent calls use it
jmx.setClientName("MyClient");
// call both cluster getters and the service getter once — matches implementation
jmx.getClusterCount();
jmx.getSymlinkClusterCount();
jmx.getServiceCount();

verify(provider, times(2)).recordClusterCount("MyClient", 2L, 1L);
verify(provider, times(1)).recordServiceCount("MyClient", 7L);
}
}