Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,23 @@ public static class Collector {
* How long grpc client will timeout in sending data to upstream.
*/
public static int GRPC_UPSTREAM_TIMEOUT = 30;
/**
* The interval in seconds to send a keepalive ping to the backend.
* If this is less than or equal to 0, the keepalive is disabled.
*
* <p>
* <b>Note:</b> The minimum safe value is 10 seconds. Values below this may be rejected by the gRPC server.
*
* This maps to `collector.grpc_keepalive_time` in agent.config.
*/
public static long GRPC_KEEPALIVE_TIME = 120L;
/**
* The timeout in seconds to wait for a keepalive ack from the backend.
* If the ack is not received within this time, the connection is considered dead.
*
* This maps to `collector.grpc_keepalive_timeout` in agent.config.
*/
public static long GRPC_KEEPALIVE_TIMEOUT = 30L;
/**
* Get profile task list interval
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.grpc.netty.NettyChannelBuilder;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.conf.Config;

public class GRPCChannel {
/**
Expand All @@ -39,6 +41,12 @@ private GRPCChannel(String host, int port, List<ChannelBuilder> channelBuilders,
List<ChannelDecorator> decorators) throws Exception {
ManagedChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port);

if (Config.Collector.GRPC_KEEPALIVE_TIME > 0) {
channelBuilder.keepAliveTime(Config.Collector.GRPC_KEEPALIVE_TIME, TimeUnit.SECONDS)
.keepAliveTimeout(Config.Collector.GRPC_KEEPALIVE_TIMEOUT, TimeUnit.SECONDS)
.keepAliveWithoutCalls(true);
}

NameResolverRegistry.getDefaultRegistry().register(new DnsNameResolverProvider());

for (ChannelBuilder builder : channelBuilders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class GRPCChannelManager implements BootService, Runnable {
private volatile List<String> grpcServers;
private volatile int selectedIdx = -1;
private volatile int reconnectCount = 0;
private final Object statusLock = new Object();

@Override
public void prepare() {
Expand Down Expand Up @@ -99,7 +100,10 @@ public void shutdown() {

@Override
public void run() {
LOGGER.debug("Selected collector grpc service running, reconnect:{}.", reconnect);
if (reconnect) {
LOGGER.warn("Selected collector grpc service running, reconnect:{}.", reconnect);
}

if (IS_RESOLVE_DNS_PERIODICALLY && reconnect) {
grpcServers = Arrays.stream(Config.Collector.BACKEND_SERVICE.split(","))
.filter(StringUtil::isNotBlank)
Expand Down Expand Up @@ -130,32 +134,28 @@ public void run() {
String server = "";
try {
int index = Math.abs(random.nextInt()) % grpcServers.size();
server = grpcServers.get(index);
String[] ipAndPort = server.split(":");

if (index != selectedIdx) {
selectedIdx = index;
LOGGER.debug("Connecting to different gRPC server {}. Shutting down existing channel if any.", server);
createNewChannel(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
} else {
// Same server, increment reconnectCount
reconnectCount++;

server = grpcServers.get(index);
String[] ipAndPort = server.split(":");

if (managedChannel != null) {
managedChannel.shutdownNow();
if (reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD) {
// Reconnect attempts exceeded threshold, force rebuild channel
LOGGER.warn("Reconnect attempts to {} exceeded threshold ({}), forcing channel rebuild",
server, Config.Agent.FORCE_RECONNECTION_PERIOD);
createNewChannel(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
} else if (managedChannel.isConnected(false)) {
// Channel appears connected, trust it but keep reconnectCount for monitoring
LOGGER.debug("Channel to {} appears connected (reconnect attempt: {})", server, reconnectCount);
notifyConnected();
}

managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
.addManagedChannelBuilder(new StandardChannelBuilder())
.addManagedChannelBuilder(new TLSChannelBuilder())
.addChannelDecorator(new AgentIDDecorator())
.addChannelDecorator(new AuthenticationDecorator())
.build();
reconnectCount = 0;
reconnect = false;
notify(GRPCChannelStatus.CONNECTED);
} else if (managedChannel.isConnected(++reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD)) {
// Reconnect to the same server is automatically done by GRPC,
// therefore we are responsible to check the connectivity and
// set the state and notify listeners
reconnectCount = 0;
reconnect = false;
notify(GRPCChannelStatus.CONNECTED);
// else: Channel is disconnected and under threshold, wait for next retry
}

return;
Expand Down Expand Up @@ -184,8 +184,7 @@ public Channel getChannel() {
*/
public void reportError(Throwable throwable) {
if (isNetworkError(throwable)) {
reconnect = true;
notify(GRPCChannelStatus.DISCONNECT);
triggerReconnect();
}
}

Expand All @@ -199,6 +198,49 @@ private void notify(GRPCChannelStatus status) {
}
}

/**
* Create a new gRPC channel to the specified server and reset connection state.
*/
private void createNewChannel(String host, int port) throws Exception {
if (managedChannel != null) {
managedChannel.shutdownNow();
}

managedChannel = GRPCChannel.newBuilder(host, port)
.addManagedChannelBuilder(new StandardChannelBuilder())
.addManagedChannelBuilder(new TLSChannelBuilder())
.addChannelDecorator(new AgentIDDecorator())
.addChannelDecorator(new AuthenticationDecorator())
.build();

// Reset reconnectCount after actually rebuilding the channel
reconnectCount = 0;
notifyConnected();
}

/**
* Trigger reconnection by setting reconnect flag and notifying listeners.
*/
private void triggerReconnect() {
synchronized (statusLock) {
reconnect = true;
notify(GRPCChannelStatus.DISCONNECT);
}
}

/**
* Notify listeners that connection is established without resetting reconnectCount.
* This is used when the channel appears connected but we want to keep monitoring
* reconnect attempts in case it's a false positive (half-open connection).
*/
private void notifyConnected() {
synchronized (statusLock) {
// Don't reset reconnectCount - connection might still be half-open
Comment on lines +231 to +238
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little confused about that. If you need to check half-open, why doesn't check it directly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't a way you can determine the server is reachable? I am a little confused. Still no TRANSIENT_FAILURE status check in your codes.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't found a reliable way to directly detect half-open connections before they cause issues. isConnected() can return READY even when the connection is actually half-open on the server side.

What I observed:
When a half-open connection exists, gRPC's keepalive mechanism will eventually detect it - the keepalive PING will fail and trigger an UNAVAILABLE exception. However, this detection can take time (depending on keepalive settings).

The change:
The key issue with the original code was that reconnectCount was reset whenever isConnected() returned true:

// Original code
} else if (managedChannel.isConnected(++reconnectCount > 5)) {
    reconnectCount = 0;  // Reset here - problem!
    reconnect = false;
}

When isConnected() returns true (even for a half-open connection), the counter resets, so it never reaches the threshold to force a channel rebuild.

Now:

if (reconnectCount > Config.Agent.FORCE_RECONNECTION_PERIOD) {
    createNewChannel(...);  // Force rebuild
} else if (managedChannel.isConnected(false)) {
    notifyConnected();  // Don't reset reconnectCount
}

reconnectCount only resets in createNewChannel() after actually rebuilding the channel. This ensures that even if isConnected() gives false positives, we'll eventually force a rebuild.

Do you think there's a better approach to detect half-open connections directly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think once any request made successfully, the channel is good.
Keeping reconnectCount unrest, but connected is set to true, this seems strange.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you check connection status as TRANSIENT_FAILURE? isConnected is checking status==READY. When it is half cloed, it should not be in READY status.

Could you rechecking the logic? This change and context seem to be not consistent with gRPC concept.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't you just add the TRANSIENT_FAILURE status into isNetworkError method? Then it could be triggered reconnection when the status changed.
If no error received, then, I don't think it is in TRANSIENT_FAILURE(or half closed).

Please check the runtime more.

reconnect = false;
notify(GRPCChannelStatus.CONNECTED);
}
}

private boolean isNetworkError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) throwable;
Expand Down
7 changes: 7 additions & 0 deletions apm-sniffer/config/agent.config
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ collector.properties_report_period_factor=${SW_AGENT_COLLECTOR_PROPERTIES_REPORT
collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800}
# How long grpc client will timeout in sending data to upstream. Unit is second.
collector.grpc_upstream_timeout=${SW_AGENT_COLLECTOR_GRPC_UPSTREAM_TIMEOUT:30}
# The interval in seconds to send a keepalive ping to the backend.
# If this is less than or equal to 0, the keepalive is disabled.
# Note: The minimum safe value is 10 seconds. Values below this may be rejected by the gRPC server.
#collector.grpc_keepalive_time=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIME:120}
# The timeout in seconds to wait for a keepalive ack from the backend.
# If the ack is not received within this time, the connection is considered dead.
#collector.grpc_keepalive_timeout=${SW_AGENT_COLLECTOR_GRPC_KEEPALIVE_TIMEOUT:30}
# Sniffer get profile task list interval.
collector.get_profile_task_interval=${SW_AGENT_COLLECTOR_GET_PROFILE_TASK_INTERVAL:20}
# Sniffer get agent dynamic config interval.
Expand Down