diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerBootstrap.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerBootstrap.java index a08a66f5a..31e6d24fc 100644 --- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerBootstrap.java +++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerBootstrap.java @@ -42,14 +42,13 @@ import com.github.rholder.retry.WaitStrategies; import com.google.common.base.Predicate; import java.lang.annotation.Annotation; -import java.util.Collection; -import java.util.Date; -import java.util.Map; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Resource; import javax.ws.rs.Path; import javax.ws.rs.ext.Provider; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.glassfish.jersey.server.ResourceConfig; import org.springframework.beans.factory.annotation.Autowired; @@ -233,15 +232,36 @@ private void renewNode() { metaServerConfig.getSchedulerHeartbeatIntervalSecs() * 1000); } + /** + * Opens and starts the Bolt session register server if it hasn't been started. + * + *
Merges the built-in sessionServerHandlers with any handlers returned by
+ * {@link #customSessionServerHandlers()}, opens a server bound to the local address and the
+ * configured session port via {@code boltExchange}, and stores the server instance in
+ * {@code sessionServer}. The method sets the internal started flag to prevent reinitialization.
+ *
+ * @throws RuntimeException if the server fails to open; the started flag is reset before
+ * rethrowing.
+ */
private void openSessionRegisterServer() {
try {
if (rpcServerForSessionStarted.compareAndSet(false, true)) {
+ List This method is idempotent: it uses an atomic compare-and-set to ensure the server is opened only once.
+ * On success, the created server is stored in the `dataServer` field and the started flag is set.
+ *
+ * @throws RuntimeException if the server fails to open; the underlying exception is wrapped.
+ */
private void openDataRegisterServer() {
try {
if (rpcServerForDataStarted.compareAndSet(false, true)) {
diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java
index a8cf59645..215be0791 100644
--- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java
+++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/bootstrap/MetaServerConfiguration.java
@@ -410,10 +410,25 @@ public MetricsResource metricsResource() {
return new MetricsResource();
}
+ /**
+ * Creates and exposes a RegistryCoreOpsResource Spring bean.
+ *
+ * @return a new {@link RegistryCoreOpsResource} instance managed by the Spring container
+ */
@Bean
public RegistryCoreOpsResource registryCoreOpsResource() {
return new RegistryCoreOpsResource();
}
+
+ /**
+ * Creates and registers a DataCenterResource as a Spring bean.
+ *
+ * Exposes a new instance of DataCenterResource for dependency injection in the application context.
+ */
+ @Bean
+ public DataCenterResource dataCenterResource() {
+ return new DataCenterResource();
+ }
}
@Configuration
diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataCenterResource.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataCenterResource.java
new file mode 100644
index 000000000..8c438d85e
--- /dev/null
+++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/DataCenterResource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.registry.server.meta.resource;
+
+import com.alipay.sofa.registry.core.model.Result;
+import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfig;
+import com.alipay.sofa.registry.server.meta.resource.filter.LeaderAwareRestController;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * @author huicha
+ * @date 2025/9/22
+ */
+@Path("datacenter")
+@LeaderAwareRestController
+public class DataCenterResource {
+
+ private Logger LOGGER = LoggerFactory.getLogger(DataCenterResource.class);
+
+ @Autowired private MetaServerConfig metaServerConfig;
+
+ /**
+ * Returns the local data center identifier wrapped in a Result.
+ *
+ * On success returns Result.success() with the local data center string in the result message.
+ * On failure returns a failed Result with message "Query meta local datacenter exception".
+ *
+ * @return a Result whose message contains the local data center on success, or a failed Result on error
+ */
+ @GET
+ @Path("query")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Result queryBlackList() {
+ try {
+ String localDataCenter = this.metaServerConfig.getLocalDataCenter();
+ Result result = Result.success();
+ result.setMessage(localDataCenter);
+ return result;
+ } catch (Throwable throwable) {
+ LOGGER.error("Query meta local datacenter exception", throwable);
+ return Result.failed("Query meta local datacenter exception");
+ }
+ }
+
+}
diff --git a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java
index 1583b995b..d023250ef 100644
--- a/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java
+++ b/server/server/meta/src/main/java/com/alipay/sofa/registry/server/meta/resource/MultiClusterSyncResource.java
@@ -27,12 +27,8 @@
import com.alipay.sofa.registry.util.StringFormatter;
import com.google.common.collect.Sets;
import java.util.Locale;
-import javax.ws.rs.FormParam;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
+import java.util.Set;
+import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
@@ -51,6 +47,14 @@ public class MultiClusterSyncResource {
@Autowired private MultiClusterSyncRepository multiClusterSyncRepository;
+ /**
+ * Retrieves the MultiClusterSyncInfo for the given remote data center.
+ *
+ * If {@code remoteDataCenter} is blank, the response is a failed GenericResponse with an explanatory message.
+ *
+ * @param remoteDataCenter the remote data center identifier to query
+ * @return a GenericResponse containing the found MultiClusterSyncInfo on success, or a failed response when the input is blank or no info is found
+ */
@GET
@Path("query")
@Produces(MediaType.APPLICATION_JSON)
@@ -66,6 +70,37 @@ public GenericResponse Fetches all locally stored MultiClusterSyncInfo entries and returns them wrapped in a
+ * GenericResponse. If no entries exist, the response contains an empty set.
+ *
+ * @return a GenericResponse whose data is a Set of MultiClusterSyncInfo representing local sync
+ * configurations
+ */
+ @GET
+ @Path("queryAll")
+ @Produces(MediaType.APPLICATION_JSON)
+ public GenericResponse The removal is performed only if the provided expectVersion matches the stored
+ * configuration's data version and sync is currently disabled for that remote data center.
+ *
+ * @param remoteDataCenter the identifier of the remote data center to remove
+ * @param expectVersion the expected current data version (used for optimistic concurrency control)
+ * @param token authentication token required to authorize this operation
+ * @return a CommonResponse whose success flag is true when a configuration was removed, false otherwise
+ */
@POST
@Path("/remove")
@Produces(MediaType.APPLICATION_JSON)
@@ -517,4 +563,128 @@ public CommonResponse removeConfig(
response.setSuccess(ret > 0);
return response;
}
+
+ /**
+ * Add comma-separated dataInfoIds to the ignore list for a remote data center.
+ *
+ * Validates the provided admin token, ensures inputs are non-empty and the supplied
+ * expectVersion matches the stored data version, then updates the MultiClusterSyncInfo
+ * by adding the given IDs to its ignoreDataInfoIds set, increments the data version,
+ * and persists the change.
+ *
+ * @param remoteDataCenter identifier of the remote data center whose config will be modified
+ * @param ignoreDataInfoIds comma-separated dataInfoId values to add to the ignore list
+ * @param token admin token used for authentication
+ * @param expectVersion expected current data version (used for optimistic concurrency)
+ * @return CommonResponse with success=true if the update was persisted, false otherwise
+ */
+ @POST
+ @Path("/sync/ignoreDataInfoIds/add")
+ @Produces(MediaType.APPLICATION_JSON)
+ public CommonResponse addIgnoreSyncDataInfoIds(
+ @FormParam("remoteDataCenter") String remoteDataCenter,
+ @FormParam("ignoreDataInfoIds") String ignoreDataInfoIds,
+ @FormParam("token") String token,
+ @FormParam("expectVersion") String expectVersion) {
+ if (!AuthChecker.authCheck(token)) {
+ LOG.error(
+ "add ignoreDataInfoIds, remoteDataCenter={}, ignoreDataInfoIds={}, auth check={} fail!",
+ remoteDataCenter,
+ ignoreDataInfoIds,
+ token);
+ return GenericResponse.buildFailedResponse("auth check fail");
+ }
+
+ if (StringUtils.isBlank(remoteDataCenter)
+ || StringUtils.isBlank(ignoreDataInfoIds)
+ || StringUtils.isBlank(expectVersion)) {
+ return CommonResponse.buildFailedResponse(
+ "remoteDataCenter, ignoreDataInfoIds, expectVersion is not allow empty.");
+ }
+
+ MultiClusterSyncInfo exist = multiClusterSyncRepository.query(remoteDataCenter);
+
+ if (exist == null || exist.getDataVersion() != Long.parseLong(expectVersion)) {
+ return CommonResponse.buildFailedResponse(
+ StringFormatter.format(
+ "remoteDataCenter:{}, expectVersion:{} not exist.", remoteDataCenter, expectVersion));
+ }
+
+ exist.getIgnoreDataInfoIds().addAll(Sets.newHashSet(ignoreDataInfoIds.split(",")));
+ exist.setDataVersion(PersistenceDataBuilder.nextVersion());
+ boolean ret = multiClusterSyncRepository.update(exist, NumberUtils.toLong(expectVersion));
+
+ LOG.info(
+ "[addIgnoreSyncDataInfoIds]result:{}, remoteDataCenter:{}, ignoreDataInfoIds:{}, expectVersion:{}",
+ ret,
+ remoteDataCenter,
+ ignoreDataInfoIds,
+ expectVersion);
+
+ CommonResponse response = new CommonResponse();
+ response.setSuccess(ret);
+ return response;
+ }
+
+ /**
+ * Removes the given comma-separated ignore data-info IDs from the ignore list of the
+ * MultiClusterSyncInfo for the specified remote data center, increments the data version,
+ * and persists the change.
+ *
+ * Performs an authentication check, validates inputs and the expected data version before
+ * applying the removal. If authentication fails, inputs are invalid, or the expected version
+ * does not match the stored version, a failed CommonResponse is returned and no change is made.
+ *
+ * @param remoteDataCenter target remote data center identifier
+ * @param ignoreDataInfoIds comma-separated data-info IDs to remove from the ignore set
+ * @param token authentication token used for authorization
+ * @param expectVersion expected current data version (used for optimistic concurrency)
+ * @return a CommonResponse whose success flag is true when the update was persisted
+ */
+ @POST
+ @Path("/sync/ignoreDataInfoIds/remove")
+ @Produces(MediaType.APPLICATION_JSON)
+ public CommonResponse removeIgnoreDataInfoIds(
+ @FormParam("remoteDataCenter") String remoteDataCenter,
+ @FormParam("ignoreDataInfoIds") String ignoreDataInfoIds,
+ @FormParam("token") String token,
+ @FormParam("expectVersion") String expectVersion) {
+ if (!AuthChecker.authCheck(token)) {
+ LOG.error(
+ "remove ignoreDataInfoIds, remoteDataCenter={}, ignoreDataInfoIds={}, auth check={} fail!",
+ remoteDataCenter,
+ ignoreDataInfoIds,
+ token);
+ return GenericResponse.buildFailedResponse("auth check fail");
+ }
+ if (StringUtils.isBlank(remoteDataCenter)
+ || StringUtils.isBlank(ignoreDataInfoIds)
+ || StringUtils.isBlank(expectVersion)) {
+ return CommonResponse.buildFailedResponse(
+ "remoteDataCenter, ignoreDataInfoIds, expectVersion is not allow empty.");
+ }
+
+ MultiClusterSyncInfo exist = multiClusterSyncRepository.query(remoteDataCenter);
+
+ if (exist == null || exist.getDataVersion() != Long.parseLong(expectVersion)) {
+ return CommonResponse.buildFailedResponse(
+ StringFormatter.format(
+ "remoteDataCenter:{}, expectVersion:{} not exist.", remoteDataCenter, expectVersion));
+ }
+
+ exist.getIgnoreDataInfoIds().removeAll(Sets.newHashSet(ignoreDataInfoIds.split(",")));
+ exist.setDataVersion(PersistenceDataBuilder.nextVersion());
+ boolean ret = multiClusterSyncRepository.update(exist, NumberUtils.toLong(expectVersion));
+
+ LOG.info(
+ "[removeIgnoreDataInfoIds]result:{}, remoteDataCenter:{}, ignoreDataInfoIds:{}, expectVersion:{}",
+ ret,
+ remoteDataCenter,
+ ignoreDataInfoIds,
+ expectVersion);
+
+ CommonResponse response = new CommonResponse();
+ response.setSuccess(ret);
+ return response;
+ }
}
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java
index 04180cd10..7da84da5e 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/bootstrap/SessionServerConfiguration.java
@@ -45,30 +45,9 @@
import com.alipay.sofa.registry.server.session.metadata.MetadataCacheRegistry;
import com.alipay.sofa.registry.server.session.multi.cluster.DataCenterMetadataCache;
import com.alipay.sofa.registry.server.session.multi.cluster.DataCenterMetadataCacheImpl;
-import com.alipay.sofa.registry.server.session.node.service.ClientNodeService;
-import com.alipay.sofa.registry.server.session.node.service.ClientNodeServiceImpl;
-import com.alipay.sofa.registry.server.session.node.service.DataNodeService;
-import com.alipay.sofa.registry.server.session.node.service.DataNodeServiceImpl;
-import com.alipay.sofa.registry.server.session.node.service.MetaServerServiceImpl;
-import com.alipay.sofa.registry.server.session.node.service.SessionMetaServerManager;
-import com.alipay.sofa.registry.server.session.providedata.AppRevisionWriteSwitchService;
-import com.alipay.sofa.registry.server.session.providedata.CompressPushService;
-import com.alipay.sofa.registry.server.session.providedata.ConfigProvideDataWatcher;
-import com.alipay.sofa.registry.server.session.providedata.FetchBlackListService;
-import com.alipay.sofa.registry.server.session.providedata.FetchCircuitBreakerService;
-import com.alipay.sofa.registry.server.session.providedata.FetchClientOffAddressService;
-import com.alipay.sofa.registry.server.session.providedata.FetchDataInfoIDBlackListService;
-import com.alipay.sofa.registry.server.session.providedata.FetchGrayPushSwitchService;
-import com.alipay.sofa.registry.server.session.providedata.FetchPushEfficiencyConfigService;
-import com.alipay.sofa.registry.server.session.providedata.FetchShutdownService;
-import com.alipay.sofa.registry.server.session.providedata.FetchStopPushService;
-import com.alipay.sofa.registry.server.session.providedata.ProvideDataProcessorManager;
-import com.alipay.sofa.registry.server.session.push.ChangeProcessor;
-import com.alipay.sofa.registry.server.session.push.FirePushService;
-import com.alipay.sofa.registry.server.session.push.PushDataGenerator;
-import com.alipay.sofa.registry.server.session.push.PushProcessor;
-import com.alipay.sofa.registry.server.session.push.PushSwitchService;
-import com.alipay.sofa.registry.server.session.push.WatchProcessor;
+import com.alipay.sofa.registry.server.session.node.service.*;
+import com.alipay.sofa.registry.server.session.providedata.*;
+import com.alipay.sofa.registry.server.session.push.*;
import com.alipay.sofa.registry.server.session.registry.Registry;
import com.alipay.sofa.registry.server.session.registry.RegistryScanCallable;
import com.alipay.sofa.registry.server.session.registry.SessionRegistry;
@@ -77,65 +56,17 @@
import com.alipay.sofa.registry.server.session.remoting.DataNodeNotifyExchanger;
import com.alipay.sofa.registry.server.session.remoting.console.SessionConsoleExchanger;
import com.alipay.sofa.registry.server.session.remoting.console.handler.*;
-import com.alipay.sofa.registry.server.session.remoting.handler.AppRevisionSliceHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.ClientNodeConnectionHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.DataChangeRequestHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.DataPushRequestHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.DataSlotDiffDigestRequestHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.DataSlotDiffPublisherRequestHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.GetRevisionPbHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.MetaRevisionHeartbeatPbHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.MetadataRegisterPbHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.NotifyProvideDataChangeHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.PublisherHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.PublisherPbHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.ServiceAppMappingPbHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.SubscriberHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.SubscriberPbHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.SyncConfigHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.SyncConfigPbHandler;
-import com.alipay.sofa.registry.server.session.remoting.handler.WatcherHandler;
-import com.alipay.sofa.registry.server.session.resource.ClientManagerResource;
-import com.alipay.sofa.registry.server.session.resource.ClientsOpenResource;
-import com.alipay.sofa.registry.server.session.resource.CompressResource;
-import com.alipay.sofa.registry.server.session.resource.ConnectionsResource;
-import com.alipay.sofa.registry.server.session.resource.EmergencyApiResource;
-import com.alipay.sofa.registry.server.session.resource.HealthResource;
-import com.alipay.sofa.registry.server.session.resource.MetadataCacheResource;
-import com.alipay.sofa.registry.server.session.resource.PersistenceClientManagerResource;
-import com.alipay.sofa.registry.server.session.resource.SessionDigestResource;
-import com.alipay.sofa.registry.server.session.resource.SessionOpenResource;
-import com.alipay.sofa.registry.server.session.resource.SlotTableStatusResource;
+import com.alipay.sofa.registry.server.session.remoting.handler.*;
+import com.alipay.sofa.registry.server.session.resource.*;
import com.alipay.sofa.registry.server.session.scheduler.timertask.CacheCountTask;
import com.alipay.sofa.registry.server.session.scheduler.timertask.SessionCacheDigestTask;
import com.alipay.sofa.registry.server.session.scheduler.timertask.SyncClientsHeartbeatTask;
import com.alipay.sofa.registry.server.session.slot.SlotTableCache;
import com.alipay.sofa.registry.server.session.slot.SlotTableCacheImpl;
-import com.alipay.sofa.registry.server.session.store.DataStore;
-import com.alipay.sofa.registry.server.session.store.FetchPubSubDataInfoIdService;
-import com.alipay.sofa.registry.server.session.store.Interests;
-import com.alipay.sofa.registry.server.session.store.SessionDataStore;
-import com.alipay.sofa.registry.server.session.store.SessionInterests;
-import com.alipay.sofa.registry.server.session.store.SessionWatchers;
-import com.alipay.sofa.registry.server.session.store.Watchers;
-import com.alipay.sofa.registry.server.session.strategy.AppRevisionHandlerStrategy;
-import com.alipay.sofa.registry.server.session.strategy.PublisherHandlerStrategy;
-import com.alipay.sofa.registry.server.session.strategy.SessionRegistryStrategy;
-import com.alipay.sofa.registry.server.session.strategy.SubscriberHandlerStrategy;
-import com.alipay.sofa.registry.server.session.strategy.SyncConfigHandlerStrategy;
-import com.alipay.sofa.registry.server.session.strategy.WatcherHandlerStrategy;
-import com.alipay.sofa.registry.server.session.strategy.impl.DefaultAppRevisionHandlerStrategy;
-import com.alipay.sofa.registry.server.session.strategy.impl.DefaultPublisherHandlerStrategy;
-import com.alipay.sofa.registry.server.session.strategy.impl.DefaultSessionRegistryStrategy;
-import com.alipay.sofa.registry.server.session.strategy.impl.DefaultSubscriberHandlerStrategy;
-import com.alipay.sofa.registry.server.session.strategy.impl.DefaultSyncConfigHandlerStrategy;
-import com.alipay.sofa.registry.server.session.strategy.impl.DefaultWatcherHandlerStrategy;
-import com.alipay.sofa.registry.server.session.wrapper.AccessLimitWrapperInterceptor;
-import com.alipay.sofa.registry.server.session.wrapper.BlacklistWrapperInterceptor;
-import com.alipay.sofa.registry.server.session.wrapper.ClientCheckWrapperInterceptor;
-import com.alipay.sofa.registry.server.session.wrapper.ClientOffWrapperInterceptor;
-import com.alipay.sofa.registry.server.session.wrapper.DataInfoIDBlacklistWrapperInterceptor;
-import com.alipay.sofa.registry.server.session.wrapper.WrapperInterceptorManager;
+import com.alipay.sofa.registry.server.session.store.*;
+import com.alipay.sofa.registry.server.session.strategy.*;
+import com.alipay.sofa.registry.server.session.strategy.impl.*;
+import com.alipay.sofa.registry.server.session.wrapper.*;
import com.alipay.sofa.registry.server.shared.client.manager.BaseClientManagerService;
import com.alipay.sofa.registry.server.shared.client.manager.ClientManagerService;
import com.alipay.sofa.registry.server.shared.config.CommonConfig;
@@ -562,10 +493,27 @@ public MetadataCacheResource metadataCacheResource() {
return new MetadataCacheResource();
}
+ /**
+ * Creates and exposes the EmergencyApiResource Spring bean.
+ *
+ * This resource provides emergency-related HTTP endpoints for runtime operations.
+ *
+ * @return a new EmergencyApiResource instance
+ */
@Bean
public EmergencyApiResource emergencyApiResource() {
return new EmergencyApiResource();
}
+
+ /**
+ * Creates and exposes a PushEfficiencyConfigResource as a Spring bean.
+ *
+ * @return a new PushEfficiencyConfigResource instance
+ */
+ @Bean
+ public PushEfficiencyConfigResource pushEfficiencyConfigResource() {
+ return new PushEfficiencyConfigResource();
+ }
}
@Configuration
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigService.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigService.java
index 0f4542453..57c471cff 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigService.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/providedata/FetchPushEfficiencyConfigService.java
@@ -21,10 +21,8 @@
import com.alipay.sofa.registry.log.Logger;
import com.alipay.sofa.registry.log.LoggerFactory;
import com.alipay.sofa.registry.server.session.bootstrap.SessionServerConfig;
-import com.alipay.sofa.registry.server.session.push.ChangeProcessor;
-import com.alipay.sofa.registry.server.session.push.FirePushService;
+import com.alipay.sofa.registry.server.session.push.PushEfficiencyConfigUpdater;
import com.alipay.sofa.registry.server.session.push.PushEfficiencyImproveConfig;
-import com.alipay.sofa.registry.server.session.push.PushProcessor;
import com.alipay.sofa.registry.server.shared.providedata.AbstractFetchSystemPropertyService;
import com.alipay.sofa.registry.server.shared.providedata.SystemDataStorage;
import com.alipay.sofa.registry.util.JsonUtils;
@@ -43,11 +41,16 @@ public class FetchPushEfficiencyConfigService
LoggerFactory.getLogger(FetchPushEfficiencyConfigService.class);
@Autowired private SessionServerConfig sessionServerConfig;
- @Autowired private ChangeProcessor changeProcessor;
- @Autowired private PushProcessor pushProcessor;
-
- @Autowired private FirePushService firePushService;
+ @Autowired private PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater;
+ /**
+ * Creates a FetchPushEfficiencyConfigService configured to fetch the push-efficiency
+ * improvement configuration from provider data.
+ *
+ * Initializes the underlying AbstractFetchSystemPropertyService with the data id
+ * for push task delay config and an initial SwitchStorage containing the initial
+ * version and a default PushEfficiencyImproveConfig instance.
+ */
public FetchPushEfficiencyConfigService() {
super(
ValueConstants.CHANGE_PUSH_TASK_DELAY_CONFIG_DATA_ID,
@@ -59,6 +62,20 @@ protected int getSystemPropertyIntervalMillis() {
return sessionServerConfig.getSystemPropertyIntervalMillis();
}
+ /**
+ * Processes provider data to update the PushEfficiencyImproveConfig if valid.
+ *
+ * Deserializes provider {@code data} into a {@link PushEfficiencyImproveConfig}, validates it,
+ * and, if applicable, atomically replaces the expected storage and applies the new configuration
+ * via {@code pushEfficiencyConfigUpdater}.
+ *
+ * @param expect the current expected {@link SwitchStorage} used for compare-and-set
+ * @param data provider data containing the config JSON
+ * @return {@code true} if processing succeeded (including the no-op case when the provided config
+ * string is blank); {@code false} on deserialization error, validation failure, out-of-date
+ * storage (compare-and-set failed), or when the config indicates it should not be applied
+ * (e.g., {@code inIpZoneSBF()} is false)
+ */
@Override
protected boolean doProcess(SwitchStorage expect, ProvideData data) {
final String configString = ProvideData.toString(data);
@@ -87,11 +104,9 @@ protected boolean doProcess(SwitchStorage expect, ProvideData data) {
if (!compareAndSet(expect, update)) {
return false;
}
- changeProcessor.setWorkDelayTime(pushEfficiencyImproveConfig);
- pushProcessor.setPushTaskDelayTime(pushEfficiencyImproveConfig);
- if (firePushService.getRegProcessor() != null) {
- firePushService.getRegProcessor().setWorkDelayTime(pushEfficiencyImproveConfig);
- }
+
+ this.pushEfficiencyConfigUpdater.updateFromProviderData(pushEfficiencyImproveConfig);
+
LOGGER.info(
"Fetch PushEfficiencyImproveConfig success, prev={}, current={}",
expect.pushEfficiencyImproveConfig,
@@ -99,28 +114,29 @@ protected boolean doProcess(SwitchStorage expect, ProvideData data) {
return true;
}
+ /**
+ * Injects a SessionServerConfig instance into this service (primarily for testing).
+ *
+ * Returns the service instance to allow fluent/chainable setup in tests.
+ *
+ * @return this FetchPushEfficiencyConfigService instance
+ */
@VisibleForTesting
- public FetchPushEfficiencyConfigService setChangeProcessor(ChangeProcessor changeProcessor) {
- this.changeProcessor = changeProcessor;
- return this;
- }
-
- @VisibleForTesting
- public FetchPushEfficiencyConfigService setPushProcessor(PushProcessor pushProcessor) {
- this.pushProcessor = pushProcessor;
- return this;
- }
-
- @VisibleForTesting
- public FetchPushEfficiencyConfigService setFirePushService(FirePushService firePushService) {
- this.firePushService = firePushService;
+ public FetchPushEfficiencyConfigService setSessionServerConfig(
+ SessionServerConfig sessionServerConfig) {
+ this.sessionServerConfig = sessionServerConfig;
return this;
}
+ /**
+ * Sets the PushEfficiencyConfigUpdater instance (typically used for testing) and returns this service for chaining.
+ *
+ * @return this FetchPushEfficiencyConfigService instance
+ */
@VisibleForTesting
- public FetchPushEfficiencyConfigService setSessionServerConfig(
- SessionServerConfig sessionServerConfig) {
- this.sessionServerConfig = sessionServerConfig;
+ public FetchPushEfficiencyConfigService setPushEfficiencyConfigUpdater(
+ PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater) {
+ this.pushEfficiencyConfigUpdater = pushEfficiencyConfigUpdater;
return this;
}
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java
new file mode 100644
index 000000000..3a09080c7
--- /dev/null
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyConfig.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.registry.server.session.push;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.builder.ToStringBuilder;
+
+/**
+ * @author huicha
+ * @date 2025/7/24
+ */
+public class AutoPushEfficiencyConfig {
+
+ private static final int DEFAULT_WINDOW_NUM = 6;
+
+ private static final long DEFAULT_WINDOW_TIME_MILLIS = TimeUnit.SECONDS.toMillis(10L);
+
+ private static final long DEFAULT_PUSH_COUNT_THRESHOLD = 170000L;
+
+ private static final int DEFAULT_DEBOUNCING_TIME_MAX = 1000;
+
+ private static final int DEFAULT_DEBOUNCING_TIME_MIN = 100;
+
+ private static final int DEFAULT_DEBOUNCING_TIME_STEP = 100;
+
+ private static final int DEFAULT_MAX_DEBOUNCING_TIME_MAX = 3000;
+
+ private static final int DEFAULT_MAX_DEBOUNCING_TIME_MIN = 1000;
+
+ private static final int DEFAULT_MAX_DEBOUNCING_TIME_STEP = 200;
+
+ private static final double DEFAULT_LOAD_THRESHOLD = 6;
+
+ private boolean enableAutoPushEfficiency = false;
+
+ private int windowNum = DEFAULT_WINDOW_NUM;
+
+ private long windowTimeMillis = DEFAULT_WINDOW_TIME_MILLIS;
+
+ private long pushCountThreshold = DEFAULT_PUSH_COUNT_THRESHOLD;
+
+ // == 攒批时长参数 ==
+ // 启动攒批时长的自动化调整
+ private boolean enableDebouncingTime = false;
+
+ // 攒批时长的最大值
+ private int debouncingTimeMax = DEFAULT_DEBOUNCING_TIME_MAX;
+
+ // 攒批时长的最小值
+ private int debouncingTimeMin = DEFAULT_DEBOUNCING_TIME_MIN;
+
+ // 调整攒批时长的步长
+ private int debouncingTimeStep = DEFAULT_DEBOUNCING_TIME_STEP;
+
+ // 启动最大攒批时长的自动化调整
+ // 可以看下下面这个方法,最大攒批时长,和攒批时长是两个不同的指标
+ // @see com.alipay.sofa.registry.server.session.push.ChangeProcessor.Worker.setChangeTaskWorkDelay
+ private boolean enableMaxDebouncingTime = false;
+
+ // 最大攒批时长的最大值
+ private int maxDebouncingTimeMax = DEFAULT_MAX_DEBOUNCING_TIME_MAX;
+
+ // 最大攒批时长的最小值
+ private int maxDebouncingTimeMin = DEFAULT_MAX_DEBOUNCING_TIME_MIN;
+
+ // 最大调整攒批时长的步长
+ private int maxDebouncingTimeStep = DEFAULT_MAX_DEBOUNCING_TIME_STEP;
+ // == 攒批时长参数 ==
+
+ // == 开关流限流参数 ==
+ private boolean enableTrafficOperateLimitSwitch = false;
+
+ private double loadThreshold = DEFAULT_LOAD_THRESHOLD;
+ /**
+ * Returns whether auto-push efficiency optimizations are enabled.
+ *
+ * If true, the session push logic should apply the configured windowing, debouncing, and load
+ * thresholds to adapt push behavior for improved efficiency.
+ *
+ * @return true when auto-push efficiency is enabled; false otherwise
+ */
+
+ public boolean isEnableAutoPushEfficiency() {
+ return enableAutoPushEfficiency;
+ }
+
+ /**
+ * Enables or disables automatic push-efficiency behavior.
+ *
+ * @param enableAutoPushEfficiency true to enable auto push efficiency; false to disable it
+ */
+ public void setEnableAutoPushEfficiency(boolean enableAutoPushEfficiency) {
+ this.enableAutoPushEfficiency = enableAutoPushEfficiency;
+ }
+
+ /**
+ * Returns the number of time windows used to aggregate push counts for auto-push efficiency.
+ *
+ * @return the configured window count (defaults to {@code 6})
+ */
+ public int getWindowNum() {
+ return windowNum;
+ }
+
+ /**
+ * Set the number of sliding windows used to aggregate push metrics for auto-push efficiency.
+ *
+ * @param windowNum the number of windows (must be a positive integer; typical default: {@code 6})
+ */
+ public void setWindowNum(int windowNum) {
+ this.windowNum = windowNum;
+ }
+
+ /**
+ * Returns the configured window duration in milliseconds used for auto-push efficiency counting.
+ *
+ * @return window duration in milliseconds
+ */
+ public long getWindowTimeMillis() {
+ return windowTimeMillis;
+ }
+
+ /**
+ * Sets the duration of a single monitoring window used by the auto-push efficiency logic.
+ *
+ * @param windowTimeMillis duration of the window in milliseconds
+ */
+ public void setWindowTimeMillis(long windowTimeMillis) {
+ this.windowTimeMillis = windowTimeMillis;
+ }
+
+ /**
+ * Returns the configured push count threshold used to trigger auto-push efficiency logic.
+ *
+ * @return the number of pushes (within the configured window) that will trigger efficiency measures
+ */
+ public long getPushCountThreshold() {
+ return pushCountThreshold;
+ }
+
+ /**
+ * Sets the push count threshold used to trigger auto-push efficiency behavior.
+ *
+ * @param pushCountThreshold the minimum number of pushes within the configured window
+ * that will activate auto-push efficiency measures
+ */
+ public void setPushCountThreshold(long pushCountThreshold) {
+ this.pushCountThreshold = pushCountThreshold;
+ }
+
+ /**
+ * Returns whether debouncing time is enabled.
+ *
+ * @return true if debouncing time is enabled, false otherwise
+ */
+ public boolean isEnableDebouncingTime() {
+ return enableDebouncingTime;
+ }
+
+ /**
+ * Enable or disable debouncing time behavior.
+ *
+ * When enabled, the configured debouncingTimeMin, debouncingTimeMax and debouncingTimeStep
+ * values are used to adjust debouncing intervals for auto-push efficiency.
+ *
+ * @param enableDebouncingTime true to enable debouncing time adjustments, false to disable
+ */
+ public void setEnableDebouncingTime(boolean enableDebouncingTime) {
+ this.enableDebouncingTime = enableDebouncingTime;
+ }
+
+ /**
+ * Returns the maximum debouncing time in milliseconds.
+ *
+ * This value is the upper bound used when debouncing is enabled to limit how long pushes
+ * may be delayed.
+ *
+ * @return maximum debouncing time (milliseconds)
+ */
+ public int getDebouncingTimeMax() {
+ return debouncingTimeMax;
+ }
+
+ /**
+ * Sets the maximum debouncing time (in milliseconds) used when debouncing is enabled.
+ *
+ * @param debouncingTimeMax maximum debouncing delay in milliseconds
+ */
+ public void setDebouncingTimeMax(int debouncingTimeMax) {
+ this.debouncingTimeMax = debouncingTimeMax;
+ }
+
+ /**
+ * Returns the configured minimum debouncing time in milliseconds.
+ *
+ * This value is used as the lower bound when debouncing is enabled to delay push operations.
+ *
+ * @return minimum debouncing time in milliseconds
+ */
+ public int getDebouncingTimeMin() {
+ return debouncingTimeMin;
+ }
+
+ /**
+ * Sets the minimum debouncing time used when debouncing push operations.
+ *
+ * @param debouncingTimeMin minimum debouncing interval in milliseconds
+ */
+ public void setDebouncingTimeMin(int debouncingTimeMin) {
+ this.debouncingTimeMin = debouncingTimeMin;
+ }
+
+ /**
+ * Returns the configured step increment used when adjusting the debouncing time.
+ *
+ * @return the debouncing time step in milliseconds
+ */
+ public int getDebouncingTimeStep() {
+ return debouncingTimeStep;
+ }
+
+ /**
+ * Set the step size (in milliseconds) used when adjusting the debouncing time.
+ *
+ * @param debouncingTimeStep step increment in milliseconds for debouncing time adjustments
+ */
+ public void setDebouncingTimeStep(int debouncingTimeStep) {
+ this.debouncingTimeStep = debouncingTimeStep;
+ }
+
+ /**
+ * Returns whether the maximum debouncing-time feature is enabled.
+ *
+ * When enabled, the configured max debouncing time range/step settings are applied to bound
+ * debouncing durations.
+ *
+ * @return true if max debouncing-time enforcement is enabled, false otherwise
+ */
+ public boolean isEnableMaxDebouncingTime() {
+ return enableMaxDebouncingTime;
+ }
+
+ /**
+ * Enables or disables the use of a configurable maximum debouncing time.
+ *
+ * @param enableMaxDebouncingTime true to enable applying the configured maximum debouncing time; false to disable
+ */
+ public void setEnableMaxDebouncingTime(boolean enableMaxDebouncingTime) {
+ this.enableMaxDebouncingTime = enableMaxDebouncingTime;
+ }
+
+ /**
+ * Returns the configured upper bound (milliseconds) for the maximum debouncing time used by auto-push efficiency.
+ *
+ * When the max-debouncing-time feature is enabled, this value caps the computed maximum debouncing delay.
+ *
+ * @return upper bound in milliseconds for max debouncing time
+ */
+ public int getMaxDebouncingTimeMax() {
+ return maxDebouncingTimeMax;
+ }
+
+ /**
+ * Sets the upper bound for the maximum debouncing time.
+ *
+ * @param maxDebouncingTimeMax the maximum debouncing time, in milliseconds
+ */
+ public void setMaxDebouncingTimeMax(int maxDebouncingTimeMax) {
+ this.maxDebouncingTimeMax = maxDebouncingTimeMax;
+ }
+
+ /**
+ * Returns the minimum allowed max-debouncing time.
+ *
+ * @return the minimum max-debouncing time in milliseconds
+ */
+ public int getMaxDebouncingTimeMin() {
+ return maxDebouncingTimeMin;
+ }
+
+ /**
+ * Sets the minimum allowed value for the maximum debouncing time.
+ *
+ * This value is in milliseconds and defines the lower bound used when computing
+ * the maximum debouncing interval applied by the auto-push efficiency logic.
+ *
+ * @param maxDebouncingTimeMin minimum max-debouncing time in milliseconds
+ */
+ public void setMaxDebouncingTimeMin(int maxDebouncingTimeMin) {
+ this.maxDebouncingTimeMin = maxDebouncingTimeMin;
+ }
+
+ /**
+ * Returns the step size (in milliseconds) used when adjusting the maximum debouncing time.
+ *
+ * @return the max debouncing time step in milliseconds
+ */
+ public int getMaxDebouncingTimeStep() {
+ return maxDebouncingTimeStep;
+ }
+
+ /**
+ * Set the step size, in milliseconds, used when adjusting the maximum debouncing time.
+ *
+ * @param maxDebouncingTimeStep step size in milliseconds for each adjustment increment
+ */
+ public void setMaxDebouncingTimeStep(int maxDebouncingTimeStep) {
+ this.maxDebouncingTimeStep = maxDebouncingTimeStep;
+ }
+
+ /**
+ * Returns whether the traffic operate limit switch is enabled.
+ *
+ * When enabled, traffic-based operation limits are applied (see {@link #getLoadThreshold()}).
+ *
+ * @return true if the traffic operate limit switch is enabled; false otherwise
+ */
+ public boolean isEnableTrafficOperateLimitSwitch() {
+ return enableTrafficOperateLimitSwitch;
+ }
+
+ /**
+ * Returns the configured load threshold used by the traffic operate limit switch.
+ *
+ * This value (default 6.0) is the load boundary at which traffic operation limits may be applied.
+ *
+ * @return the current load threshold
+ */
+ public double getLoadThreshold() {
+ return loadThreshold;
+ }
+
+ /**
+ * Sets the load threshold used by the traffic operate limit switch.
+ *
+ * This value determines the load level at which traffic operation limits are applied
+ * when the traffic operate limit switch is enabled.
+ *
+ * @param loadThreshold the load threshold value
+ */
+ public void setLoadThreshold(double loadThreshold) {
+ this.loadThreshold = loadThreshold;
+ }
+
+ /**
+ * Returns a string representation of this configuration.
+ *
+ * The returned value is generated via reflection and includes the values of this object's fields.
+ *
+ * @return a human-readable string containing the names and values of the fields of this instance
+ */
+ @Override
+ public String toString() {
+ return ToStringBuilder.reflectionToString(this);
+ }
+}
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java
new file mode 100644
index 000000000..e50fb4cc7
--- /dev/null
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/AutoPushEfficiencyRegulator.java
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.registry.server.session.push;
+
+import com.alipay.sofa.registry.log.Logger;
+import com.alipay.sofa.registry.log.LoggerFactory;
+import com.alipay.sofa.registry.util.ConcurrentUtils;
+import com.alipay.sofa.registry.util.LoopRunnable;
+import com.google.common.annotations.VisibleForTesting;
+import java.lang.management.ManagementFactory;
+import java.lang.management.OperatingSystemMXBean;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * 推送流控配置
+ *
+ * @author huicha
+ * @date 2025/7/23
+ */
+public class AutoPushEfficiencyRegulator extends LoopRunnable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger("AUTO-PUSH-EFFICIENCY-REGULATOR");
+
+ private static final AtomicLong ID_GENERATOR = new AtomicLong(0);
+
+ // 窗口的时长 (毫秒)
+ private final long windowTime;
+
+ // 窗口数量
+ private final int windowNum;
+
+ // 窗口; 用于统计每个窗口内的推送次数
+ private final AtomicLong[] windows;
+
+ // 当前窗口的索引
+ private final AtomicInteger index;
+
+ // 阈值; 推送次数高于这个阈值的时候会开始逐渐调整攒批配置
+ private final long pushCountThreshold;
+
+ // 阈值; 用于调整开关流限流开关的 Load 阈值。
+ // 不影响推送攒批配置
+ private final double loadThreshold;
+
+ // 预热次数,等到所有的窗口都轮换过一遍之后才能开始统计
+ // 这里因为 warmupTimes 的值总是单线程读写的,因此没有加 volatile 关键字
+ private int warmupTimes;
+
+ // 唯一 ID
+ private final Long id;
+
+ // 推送效率配置更新器
+ private final PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater;
+
+ // 采集系统负载指标
+ private final OperatingSystemMXBean operatingSystemMXBean;
+
+ // 攒批时长
+ private final IntMetric debouncingTime;
+
+ // 最大攒批时长
+ private final IntMetric maxDebouncingTime;
+
+ // 开关流限流
+ // 因为命名为 enable traffic operate 在这里可能会有些歧义
+ // 可能会有人理解成 load 过高时是否操作开关流限流,因此这里命名的比较长
+ private final BooleanMetric trafficOperateLimitSwitch;
+
+ /**
+ * Creates and starts an AutoPushEfficiencyRegulator that monitors push activity and adapts
+ * debouncing and traffic-limit settings over rolling time windows.
+ *
+ * The constructor reads configuration from the provided PushEfficiencyImproveConfig, initializes
+ * the rolling windows and internal metrics (debouncingTime, maxDebouncingTime, trafficOperateLimitSwitch),
+ * assigns an internal id, and starts a daemon thread to run the regulator loop.
+ *
+ * @param pushEfficiencyImproveConfig configuration source used to initialize window sizes,
+ * thresholds, and metric bounds/steps
+ */
+ public AutoPushEfficiencyRegulator(
+ PushEfficiencyImproveConfig pushEfficiencyImproveConfig,
+ PushEfficiencyConfigUpdater pushEfficiencyConfigUpdater) {
+ // 获取自适应攒批配置
+ AutoPushEfficiencyConfig autoPushEfficiencyConfig =
+ pushEfficiencyImproveConfig.getAutoPushEfficiencyConfig();
+
+ // 初始化窗口相关配置
+ this.windowTime = autoPushEfficiencyConfig.getWindowTimeMillis();
+ this.windowNum = autoPushEfficiencyConfig.getWindowNum();
+ this.windows = new AtomicLong[windowNum];
+ for (int i = 0; i < windowNum; i++) {
+ this.windows[i] = new AtomicLong(0);
+ }
+ this.index = new AtomicInteger(0);
+
+ // 设置其他参数
+ this.id = ID_GENERATOR.incrementAndGet();
+ this.pushCountThreshold = autoPushEfficiencyConfig.getPushCountThreshold();
+ this.loadThreshold = autoPushEfficiencyConfig.getLoadThreshold();
+ this.warmupTimes = 0;
+ this.pushEfficiencyConfigUpdater = pushEfficiencyConfigUpdater;
+ this.operatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean();
+
+ // 初始化可能需要调整的指标
+ this.debouncingTime =
+ new IntMetric(
+ autoPushEfficiencyConfig.isEnableDebouncingTime(),
+ pushEfficiencyImproveConfig.getChangeDebouncingMillis(),
+ autoPushEfficiencyConfig.getDebouncingTimeMax(),
+ autoPushEfficiencyConfig.getDebouncingTimeMin(),
+ autoPushEfficiencyConfig.getDebouncingTimeStep());
+ this.maxDebouncingTime =
+ new IntMetric(
+ autoPushEfficiencyConfig.isEnableMaxDebouncingTime(),
+ pushEfficiencyImproveConfig.getChangeDebouncingMaxMillis(),
+ autoPushEfficiencyConfig.getMaxDebouncingTimeMax(),
+ autoPushEfficiencyConfig.getMaxDebouncingTimeMin(),
+ autoPushEfficiencyConfig.getMaxDebouncingTimeStep());
+ this.trafficOperateLimitSwitch =
+ new BooleanMetric(autoPushEfficiencyConfig.isEnableTrafficOperateLimitSwitch());
+
+ // 启动定时任务
+ ConcurrentUtils.createDaemonThread("AutoPushEfficiencyRegulator-" + this.id, this).start();
+ }
+
+ /**
+ * Atomically increments the push counter for the currently active rolling window.
+ *
+ * This is a no-op when the regulator is closed. Errors thrown during the increment are caught
+ * to prevent propagation.
+ */
+ public void safeIncrementPushCount() {
+ try {
+ if (this.isClosed()) {
+ return;
+ }
+ int currentIndex = this.index.get();
+ this.windows[currentIndex].incrementAndGet();
+ } catch (Throwable throwable) {
+ LOGGER.error(
+ "[module=AutoPushEfficiencyRegulator][method=safeIncrementPushCount] increment push count exception",
+ throwable);
+ }
+ }
+
+ /**
+ * Advance the rolling-window index to the next window, wrapping to zero when needed.
+ *
+ * The target window's counter is cleared to 0 before the active index is updated,
+ * ensuring the newly activated window starts with an empty count.
+ */
+ private void rollWindow() {
+ // 1. 获取当前窗口的索引
+ int currentIndex = this.index.get();
+
+ // 2. 计算出下一个窗口的索引
+ int newIndex = currentIndex + 1;
+ if (newIndex >= this.windowNum) {
+ newIndex = 0;
+ }
+
+ // 3. 先清空下一个窗口的统计值,然后再更新索引
+ this.windows[newIndex].set(0);
+ this.index.set(newIndex);
+ }
+
+ /**
+ * Sums and returns the total push count across all rolling windows.
+ *
+ * @return the aggregated push count from every window's AtomicLong counter
+ */
+ private long computeTotalPushCount() {
+ long totalPushCount = 0;
+ for (int forIndex = 0; forIndex < this.windows.length; forIndex++) {
+ totalPushCount += this.windows[forIndex].get();
+ }
+ return totalPushCount;
+ }
+
+ /**
+ * Determines whether the total push count across all rolling windows exceeds the configured threshold.
+ *
+ * @return true if the sum of all window push counters is greater than {@code pushCountThreshold}; false otherwise
+ */
+ private boolean checkPushCountIsHigh() {
+ long totalPushCount = this.computeTotalPushCount();
+ return totalPushCount > this.pushCountThreshold;
+ }
+
+ /**
+ * Reads the current debouncing and max debouncing metrics and applies them to the
+ * PushEfficiencyConfigUpdater, logging the values with the provided tag.
+ *
+ * @param tag short label indicating the reason or source of this update (used in logs)
+ */
+ private void updateDebouncingTime(String tag) {
+ int debouncingTime = this.debouncingTime.load();
+ int maxDebouncingTime = this.maxDebouncingTime.load();
+ LOGGER.info(
+ "[ID: {}][{}] debouncingTime: {} maxDebouncingTime: {}",
+ this.id,
+ tag,
+ debouncingTime,
+ maxDebouncingTime);
+ this.pushEfficiencyConfigUpdater.updateDebouncingTime(debouncingTime, maxDebouncingTime);
+ }
+
+ /**
+ * Read the current traffic-operate-limit switch state and apply it to the configuration updater.
+ *
+ * Retrieves the switch value from the internal metric and forwards it to
+ * PushEfficiencyConfigUpdater.updateTrafficOperateLimitSwitch.
+ */
+ private void updateTrafficOperateLimitSwitch() {
+ boolean trafficOperateLimitSwitch = this.trafficOperateLimitSwitch.load();
+ LOGGER.info("[ID: {}] trafficOperateLimitSwitch: {}", this.id, trafficOperateLimitSwitch);
+ this.pushEfficiencyConfigUpdater.updateTrafficOperateLimitSwitch(trafficOperateLimitSwitch);
+ }
+
+ /**
+ * Returns the regulator's unique identifier.
+ *
+ * @return the unique id of this AutoPushEfficiencyRegulator
+ */
+ public Long getId() {
+ return id;
+ }
+
+ /**
+ * Main periodic loop executed by the regulator thread: performs warmup window rotation, evaluates
+ * recent push activity, and adjusts debouncing and traffic-limit settings accordingly.
+ *
+ * Behavior:
+ * - If the regulator is closed, the method returns immediately.
+ * - During warmup (fewer than configured windows have been rotated) it advances the rolling
+ * window and increments the warmup counter without evaluating push rates.
+ * - After warmup completes, it computes whether the total push count across all windows exceeds
+ * the configured threshold, then attempts to update debouncing-related metrics and the
+ * traffic-operate-limit switch based on that result and current system load.
+ * - Finally, it advances (rolls) the window. The final roll is intentionally performed after
+ * evaluations to avoid artificially lowering the observed push rate by clearing the newest
+ * window prematurely.
+ */
+ @Override
+ public void runUnthrowable() {
+ if (this.isClosed()) {
+ // 如果任务已经被停止了,那么这个进程需要退出
+ return;
+ }
+
+ // 1. 检查是否所有的窗口都轮换过了,即是否完成了预热
+ if (this.warmupTimes < this.windowNum) {
+ // 如果还没有,那么直接滚动窗口,不需要去检查推送频率
+ this.rollWindow();
+ this.warmupTimes++;
+ return;
+ }
+
+ // 2. 已经完成预热了,检查推送频率是否过高
+ boolean pushCountIsHigh = this.checkPushCountIsHigh();
+
+ // 3. 根据推送频率调整推送配置
+ this.tryUpdatePushConfig(pushCountIsHigh);
+
+ // 4. 根据推送频率以及负载情况调整开关流限流配置
+ this.tryUpdateTrafficOperateLimitSwitch(pushCountIsHigh);
+
+ // 3. 滚动窗口
+ // 这里放到最后滚动窗口是因为:
+ // 滚动窗口时,会把最新的窗口计数清零,如果先滚动后检查推送频率,
+ // 那么感知到的推送频率就会偏小一点
+ this.rollWindow();
+ }
+
+ /**
+ * Evaluate system load and push activity to toggle the traffic-operate-limit switch.
+ *
+ * If the traffic-operate-limit feature is enabled, this method reads the system
+ * 1-minute load average and:
+ * - turns the switch on when both the recent push count is high and load > loadThreshold,
+ * - turns the switch off otherwise.
+ *
+ * If the switch state actually changes, updateTrafficOperateLimitSwitch() is invoked
+ * to persist the change via the configured updater.
+ *
+ * @param pushCountIsHigh true when the aggregated push count across rolling windows exceeds the configured threshold
+ */
+ private void tryUpdateTrafficOperateLimitSwitch(boolean pushCountIsHigh) {
+ if (!this.trafficOperateLimitSwitch.isEnable()) {
+ // 如果没有开启支持操作开关流,那么就不执行后续的代码了,尽量尝试避免获取系统负载
+ return;
+ }
+
+ // 这里获取到的是过去一分钟的负载平均值,这个值有可能小于 0,小于 0 时表示无法获取平均负载
+ // 另外,这个方法的注释上写了这个方法设计上就会考虑可能较频繁调用,因此这里先不考虑做限制了
+ double loadAverage = this.operatingSystemMXBean.getSystemLoadAverage();
+ if (loadAverage < 0) {
+ return;
+ }
+
+ boolean loadIsHigh = loadAverage > loadThreshold;
+
+ if (pushCountIsHigh && loadIsHigh) {
+ if (this.trafficOperateLimitSwitch.tryTurnOn()) {
+ this.updateTrafficOperateLimitSwitch();
+ }
+ } else {
+ if (this.trafficOperateLimitSwitch.tryTurnOff()) {
+ this.updateTrafficOperateLimitSwitch();
+ }
+ }
+ }
+
+ /**
+ * Adjusts debouncing-related push configuration based on current push rate.
+ *
+ * If `pushCountIsHigh` is true, attempts to increment debouncing metrics; if any metric
+ * changes, calls {@code updateDebouncingTime("Increment")}. If false, attempts to
+ * decrement metrics and, on change, calls {@code updateDebouncingTime("Decrement")}.
+ *
+ * @param pushCountIsHigh whether the recent aggregated push count exceeds the configured threshold
+ */
+ private void tryUpdatePushConfig(boolean pushCountIsHigh) {
+ if (pushCountIsHigh) {
+ // 推送频率过高,尝试更新攒批时长
+ if (this.tryIncrementPushConfig()) {
+ this.updateDebouncingTime("Increment");
+ }
+ } else {
+ // 推送频率正常,此时尝试逐渐降低攒批时长
+ if (this.tryDecrementPushConfig()) {
+ this.updateDebouncingTime("Decrement");
+ }
+ }
+ }
+
+ /**
+ * Attempt to increase push-related debounce metrics.
+ *
+ * Tries to increment the configured debouncingTime and maxDebouncingTime metrics
+ * (if each metric is enabled and not already at its maximum). Returns true if
+ * at least one metric was changed.
+ *
+ * @return true if either debouncingTime or maxDebouncingTime was successfully incremented
+ */
+ private boolean tryIncrementPushConfig() {
+ boolean dataChange = false;
+
+ if (debouncingTime.tryIncrement()) {
+ dataChange = true;
+ }
+
+ if (maxDebouncingTime.tryIncrement()) {
+ dataChange = true;
+ }
+
+ return dataChange;
+ }
+
+ /**
+ * Attempt to decrease configurable push debouncing metrics.
+ *
+ * Tries to decrement the internal debouncingTime and maxDebouncingTime metrics (if enabled
+ * and above their minimums). Returns true if at least one metric was changed.
+ *
+ * @return true if any metric value was decremented, false otherwise
+ */
+ private boolean tryDecrementPushConfig() {
+ boolean dataChange = false;
+
+ if (debouncingTime.tryDecrement()) {
+ dataChange = true;
+ }
+
+ if (maxDebouncingTime.tryDecrement()) {
+ dataChange = true;
+ }
+
+ return dataChange;
+ }
+
+ /**
+ * Sleeps uninterruptibly for the regulator's configured window duration.
+ *
+ * Blocks the caller for {@code windowTime} milliseconds between regulation cycles.
+ */
+ @Override
+ public void waitingUnthrowable() {
+ ConcurrentUtils.sleepUninterruptibly(this.windowTime, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Returns the configured number of rolling windows used by the regulator.
+ *
+ * @return the number of time windows in the rolling window array
+ */
+ @VisibleForTesting
+ public int getWindowNum() {
+ return this.windowNum;
+ }
+
+ /**
+ * Returns the number of rolling window counters.
+ *
+ * Useful for testing/inspection to verify how many windows the regulator maintains.
+ *
+ * @return the configured number of windows (length of the internal windows array)
+ */
+ @VisibleForTesting
+ public int getWindowsSize() {
+ return this.windows.length;
+ }
+
+ /**
+ * Returns the configured total-push threshold used to determine a high push rate.
+ *
+ * @return the push count threshold (sum of pushes across all rolling windows)
+ */
+ @VisibleForTesting
+ public long getPushCountThreshold() {
+ return this.pushCountThreshold;
+ }
+}
+
+class IntMetric {
+
+ private final boolean enable;
+
+ // 当不启用自适应攒批时,指标的默认值
+ private final int defaultV;
+
+ // 指标的最大值
+ private final int max;
+
+ // 指标的最小值
+ private final int min;
+
+ // 指标的步长
+ private final int step;
+
+ // 当前指标的值
+ private int current;
+
+ /**
+ * Create an IntMetric controlling a bounded integer value that can be stepped up or down.
+ *
+ * @param enable whether the metric is active (if false, load() will return {@code defaultV} and
+ * tryIncrement/tryDecrement will be no-ops)
+ * @param defaultV value to return from load() when the metric is not enabled
+ * @param max maximum allowed value for the metric (upper bound applied when incrementing)
+ * @param min minimum allowed value for the metric (lower bound applied when decrementing);
+ * also used to initialize the current value
+ * @param step amount to change the current value on each increment/decrement operation
+ */
+ public IntMetric(boolean enable, int defaultV, int max, int min, int step) {
+ this.enable = enable;
+ this.defaultV = defaultV;
+ this.max = max;
+ this.min = min;
+ this.step = step;
+ this.current = min;
+ }
+
+ /**
+ * Attempt to increase the metric by one step, up to the configured maximum.
+ *
+ * If the metric is disabled, this is a no-op and returns false. When enabled,
+ * the current value is increased by `step` but never exceeds `max` (the value
+ * is clamped to `max`). Returns true if the value changed.
+ *
+ * @return true if the metric was incremented, false if disabled or already at max
+ */
+ public boolean tryIncrement() {
+ if (!this.enable) {
+ return false;
+ }
+
+ if (this.current < this.max) {
+ int newValue = this.current + this.step;
+ if (newValue > this.max) {
+ newValue = this.max;
+ }
+ this.current = newValue;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Attempt to decrease the metric by one step, bounded by the configured minimum.
+ *
+ * If the metric is disabled or already at its minimum value, this is a no-op and returns false.
+ * When a decrement occurs, the current value is reduced by `step` but not below `min`.
+ *
+ * @return true if the current value was decreased; false otherwise
+ */
+ public boolean tryDecrement() {
+ if (!this.enable) {
+ return false;
+ }
+ if (this.current > this.min) {
+ int newValue = this.current - this.step;
+ if (newValue < this.min) {
+ newValue = this.min;
+ }
+ this.current = newValue;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Returns whether this metric is enabled.
+ *
+ * @return true if the metric is active and can be changed; false otherwise
+ */
+ public boolean isEnable() {
+ return this.enable;
+ }
+
+ /**
+ * Returns the active value of this metric.
+ *
+ * If the metric is enabled, returns the current configured value; otherwise returns the defined default value.
+ *
+ * @return the metric value to use (current when enabled, default when disabled)
+ */
+ public int load() {
+ if (this.enable) {
+ return this.current;
+ } else {
+ return this.defaultV;
+ }
+ }
+
+ /**
+ * Returns the configured default value for this metric.
+ *
+ * This value is intended to be used when the metric is not enabled.
+ *
+ * @return the metric's default integer value
+ */
+ public int loadDefaultV() {
+ return this.defaultV;
+ }
+}
+
+class BooleanMetric {
+
+ private final boolean enable;
+
+ private boolean current;
+
+ /**
+ * Create a BooleanMetric.
+ *
+ * @param enable whether this boolean metric is active; when false, state changes will be ignored
+ */
+ public BooleanMetric(boolean enable) {
+ this.enable = enable;
+ this.current = false;
+ }
+
+ /**
+ * Attempts to enable (turn on) this boolean metric.
+ *
+ * If the metric is not enabled or is already on, the method does nothing.
+ *
+ * @return true if the metric was off and was changed to on; false if it was already on or not enabled.
+ */
+ public boolean tryTurnOn() {
+ if (!this.enable) {
+ return false;
+ }
+ if (this.current) {
+ return false;
+ } else {
+ this.current = true;
+ return true;
+ }
+ }
+
+ /**
+ * Attempts to turn the metric off.
+ *
+ * If the metric is not enabled this is a no-op and returns false. If the metric is enabled and currently on,
+ * it flips the state to off and returns true. If it is already off, returns false.
+ *
+ * @return true if the metric was on and was turned off; false if it was already off or not enabled
+ */
+ public boolean tryTurnOff() {
+ if (!this.enable) {
+ return false;
+ }
+ if (this.current) {
+ this.current = false;
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Returns whether this metric is enabled.
+ *
+ * @return true if the metric is active and can be changed; false otherwise
+ */
+ public boolean isEnable() {
+ return this.enable;
+ }
+
+ /**
+ * Returns the current boolean state of this metric.
+ *
+ * Callers may check isEnable() to determine whether the metric is active; this method always returns the last stored state.
+ *
+ * @return true if the metric is currently on, false otherwise
+ */
+ public boolean load() {
+ return this.current;
+ }
+}
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java
new file mode 100644
index 000000000..7b2838d63
--- /dev/null
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeDebouncingTime.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.registry.server.session.push;
+
+/**
+ * @author huicha
+ * @date 2025/9/8
+ */
+public class ChangeDebouncingTime {
+
+ private int changeDebouncingMillis;
+
+ private int changeDebouncingMaxMillis;
+
+ /**
+ * Creates a ChangeDebouncingTime with default values (both debouncing fields set to 0).
+ */
+public ChangeDebouncingTime() {}
+
+ /**
+ * Create a ChangeDebouncingTime with specified debouncing durations.
+ *
+ * @param changeDebouncingMillis the base debouncing duration in milliseconds
+ * @param changeDebouncingMaxMillis the maximum allowed debouncing duration in milliseconds (upper bound for debouncing)
+ */
+ public ChangeDebouncingTime(int changeDebouncingMillis, int changeDebouncingMaxMillis) {
+ this.changeDebouncingMillis = changeDebouncingMillis;
+ this.changeDebouncingMaxMillis = changeDebouncingMaxMillis;
+ }
+
+ /**
+ * Returns the configured debounce interval for a change, in milliseconds.
+ *
+ * @return the change debouncing interval in milliseconds
+ */
+ public int getChangeDebouncingMillis() {
+ return changeDebouncingMillis;
+ }
+
+ /**
+ * Set the debouncing duration for a single change, in milliseconds.
+ *
+ * @param changeDebouncingMillis the debouncing time in milliseconds
+ */
+ public void setChangeDebouncingMillis(int changeDebouncingMillis) {
+ this.changeDebouncingMillis = changeDebouncingMillis;
+ }
+
+ /**
+ * Returns the maximum allowed debouncing delay for changes, in milliseconds.
+ *
+ * @return the maximum change debouncing time in milliseconds
+ */
+ public int getChangeDebouncingMaxMillis() {
+ return changeDebouncingMaxMillis;
+ }
+
+ /**
+ * Sets the maximum debouncing interval for change notifications, in milliseconds.
+ *
+ * @param changeDebouncingMaxMillis maximum debounce duration in milliseconds
+ */
+ public void setChangeDebouncingMaxMillis(int changeDebouncingMaxMillis) {
+ this.changeDebouncingMaxMillis = changeDebouncingMaxMillis;
+ }
+}
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java
index 4f6f3efc5..40d26fa30 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/ChangeProcessor.java
@@ -23,11 +23,7 @@
import com.alipay.sofa.registry.util.StringFormatter;
import com.alipay.sofa.registry.util.WakeUpLoopRunnable;
import com.google.common.collect.Maps;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
+import java.util.*;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
@@ -56,6 +52,16 @@ private Worker[] initWorkers() {
return workers;
}
+ /**
+ * Apply the given push-efficiency configuration to all workers' change-task delay settings.
+ *
+ * Iterates the internal dataCenterWorkers map and calls each Worker's
+ * setChangeTaskWorkDelay(...) with the provided configuration. If a data center
+ * entry contains a null Worker[] the method returns immediately and stops applying
+ * the configuration to any remaining entries.
+ *
+ * @param pushEfficiencyImproveConfig configuration containing delay values to apply to workers
+ */
public void setWorkDelayTime(PushEfficiencyImproveConfig pushEfficiencyImproveConfig) {
for (Map.Entry Enables or disables automatic push-efficiency regulation according to the provided
+ * PushEfficiencyImproveConfig. When auto mode is enabled this method will close any existing
+ * AutoPushEfficiencyRegulator, initialize debouncing values to their configured minimums when
+ * applicable, and create a new AutoPushEfficiencyRegulator. When auto mode is disabled it will
+ * close and clear any existing regulator. In all cases the updated regulator (or null) is pushed
+ * into the PushProcessor, and work delay settings are applied to ChangeProcessor, PushProcessor,
+ * and the FirePushService registration processor (if present). Finally, traffic limiting is turned
+ * off via the ClientManagerResource.
+ *
+ * If the component has been stopped, the call is a no-op.
+ *
+ * @param pushEfficiencyImproveConfig configuration object from provider describing push-efficiency
+ * settings (including optional AutoPushEfficiencyConfig and debouncing/max-debouncing values)
+ */
+ public void updateFromProviderData(PushEfficiencyImproveConfig pushEfficiencyImproveConfig) {
+ this.lock.lock();
+ try {
+ LOGGER.info(
+ "[PushEfficiencyConfigUpdater] update config from provider data: {}",
+ pushEfficiencyImproveConfig);
+
+ if (this.stop) {
+ // 已销毁
+ return;
+ }
+
+ AutoPushEfficiencyConfig autoPushEfficiencyConfig =
+ pushEfficiencyImproveConfig.getAutoPushEfficiencyConfig();
+ if (null != autoPushEfficiencyConfig
+ && autoPushEfficiencyConfig.isEnableAutoPushEfficiency()) {
+ // 新的配置中,开启了自动化配置
+ this.useAutoPushEfficiency = true;
+
+ if (null != this.autoPushEfficiencyRegulator) {
+ // 此时还存在正在运行的 AutoPushEfficiencyRegulator 则需要关掉
+ LOGGER.info(
+ "[PushEfficiencyConfigUpdater] close old auto push efficiency regulator, id: {}, will create new one",
+ this.autoPushEfficiencyRegulator.getId());
+ this.autoPushEfficiencyRegulator.close();
+ }
+
+ // 这里需要调整下初始配置的值
+ if (autoPushEfficiencyConfig.isEnableDebouncingTime()) {
+ // 当自适应攒批需要调整 debouncing time 的时候,需要将 debouncing time 的初始值设置为 min
+ pushEfficiencyImproveConfig.setChangeDebouncingMillis(
+ autoPushEfficiencyConfig.getDebouncingTimeMin());
+ }
+
+ if (autoPushEfficiencyConfig.isEnableMaxDebouncingTime()) {
+ // 当自适应攒批需要调整 max debouncing time 的时候,需要将 debouncing time 的初始值设置为 min
+ pushEfficiencyImproveConfig.setChangeDebouncingMaxMillis(
+ autoPushEfficiencyConfig.getMaxDebouncingTimeMin());
+ }
+
+ this.autoPushEfficiencyRegulator =
+ new AutoPushEfficiencyRegulator(pushEfficiencyImproveConfig, this);
+ } else {
+ // 新的配置中,关闭了自动化配置,此时如果还存在正在运行的 AutoPushEfficiencyRegulator 则需要关掉
+ this.useAutoPushEfficiency = false;
+
+ if (null != this.autoPushEfficiencyRegulator) {
+ LOGGER.info(
+ "[PushEfficiencyConfigUpdater] close old auto push efficiency regulator, id: {}, will not create new one",
+ this.autoPushEfficiencyRegulator.getId());
+ this.autoPushEfficiencyRegulator.close();
+ }
+
+ this.autoPushEfficiencyRegulator = null;
+ }
+
+ // 更新一下 PushProcessor 中的 AutoPushEfficiencyRegulator,以便于统计推送次数
+ this.pushProcessor.setAutoPushEfficiencyRegulator(this.autoPushEfficiencyRegulator);
+
+ // 更新配置
+ this.changeProcessor.setWorkDelayTime(pushEfficiencyImproveConfig);
+ this.pushProcessor.setPushTaskDelayTime(pushEfficiencyImproveConfig);
+ if (this.firePushService.getRegProcessor() != null) {
+ this.firePushService.getRegProcessor().setWorkDelayTime(pushEfficiencyImproveConfig);
+ }
+
+ // 无论如何,先关闭掉限流
+ this.clientManagerResource.setEnableTrafficOperate(true);
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ /**
+ * Update the change debouncing window used by automatic push-efficiency regulation.
+ *
+ * When auto push efficiency is not enabled this method is a no-op. Otherwise it
+ * updates the minimum and maximum debouncing delays (in milliseconds) applied
+ * by the ChangeProcessor.
+ *
+ * @param debouncingTime minimum debouncing delay in milliseconds
+ * @param maxDebouncingTime maximum debouncing delay in milliseconds
+ */
+ public void updateDebouncingTime(int debouncingTime, int maxDebouncingTime) {
+ this.lock.lock();
+ try {
+ if (!this.useAutoPushEfficiency) {
+ // 如果已经停止使用自动化配置了,那么这里就跳过更新,以防止最终实际使用的配置不是 ProvideData 中的配置
+ return;
+ }
+ this.changeProcessor.setChangeDebouncingMillis(debouncingTime, maxDebouncingTime);
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ /**
+ * Update the traffic operation limit switch when automatic push-efficiency regulation is enabled.
+ *
+ * If automatic mode is not active, this method is a no-op. The provided flag represents whether
+ * the traffic limit should be enabled; the updater inverts this flag before applying it because
+ * enabling the limit disables normal traffic operations.
+ *
+ * @param trafficOperateLimitSwitch true to enable traffic limiting, false to disable it
+ */
+ public void updateTrafficOperateLimitSwitch(boolean trafficOperateLimitSwitch) {
+ this.lock.lock();
+ try {
+ if (!this.useAutoPushEfficiency) {
+ // 如果已经停止使用自动化配置了,那么这里就跳过更新,以防止最终实际使用的配置不是 ProvideData 中的配置
+ return;
+ }
+
+ // 打开限制开关,意味着开启了限流,也就是不允许操作开关流,因此这里是反的
+ this.clientManagerResource.setEnableTrafficOperate(!trafficOperateLimitSwitch);
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ /**
+ * No-op implementation of SmartLifecycle.start().
+ *
+ * This component does not perform any action when the lifecycle is started; lifecycle
+ * state is managed via {@link #stop()} and {@link #isRunning()}.
+ */
+ @Override
+ public void start() {}
+
+ /**
+ * Stops the component and releases associated resources.
+ *
+ * Marks the updater as stopped and, if an AutoPushEfficiencyRegulator exists, closes it.
+ * This method is thread-safe; it acquires an internal lock while mutating state.
+ * It is intended to be called when the Spring bean is being destroyed.
+ */
+ @Override
+ public void stop() {
+ // Bean 被销毁的时候需要清理释放线程资源
+ this.lock.lock();
+ try {
+ if (!this.stop) {
+ this.stop = true;
+ if (null != this.autoPushEfficiencyRegulator) {
+ this.autoPushEfficiencyRegulator.close();
+ }
+ }
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the current stop state of this component.
+ *
+ * Thread-safe: reads the internal stop flag under a lock. The method returns true when the
+ * component has been stopped (the internal {@code stop} flag is set), and false otherwise.
+ *
+ * @return {@code true} if the component is stopped; {@code false} if it is not stopped
+ */
+ @Override
+ public boolean isRunning() {
+ this.lock.lock();
+ try {
+ return this.stop;
+ } finally {
+ this.lock.unlock();
+ }
+ }
+
+ /**
+ * Returns the current AutoPushEfficiencyRegulator instance.
+ *
+ * Intended for testing; may return null if automatic push efficiency is not active
+ * or the regulator has been closed.
+ *
+ * @return the active AutoPushEfficiencyRegulator, or {@code null} if none is present
+ */
+ @VisibleForTesting
+ public AutoPushEfficiencyRegulator getAutoPushEfficiencyRegulator() {
+ return autoPushEfficiencyRegulator;
+ }
+
+ /**
+ * Test-only setter that replaces the ChangeProcessor instance used by this updater.
+ */
+ @VisibleForTesting
+ public void setChangeProcessor(ChangeProcessor changeProcessor) {
+ this.changeProcessor = changeProcessor;
+ }
+
+ /**
+ * Replaces the current PushProcessor instance. Intended for testing to inject a mock or stub.
+ */
+ @VisibleForTesting
+ public void setPushProcessor(PushProcessor pushProcessor) {
+ this.pushProcessor = pushProcessor;
+ }
+
+ /**
+ * Test hook to replace the FirePushService instance used by this component.
+ *
+ * Intended for use in unit tests to inject a mock or alternative implementation.
+ */
+ @VisibleForTesting
+ public void setFirePushService(FirePushService firePushService) {
+ this.firePushService = firePushService;
+ }
+
+ /**
+ * Test hook to replace the ClientManagerResource used by this updater.
+ *
+ * Intended for use in tests to inject a mock or stub implementation.
+ */
+ @VisibleForTesting
+ public void setClientManagerResource(ClientManagerResource clientManagerResource) {
+ this.clientManagerResource = clientManagerResource;
+ }
+}
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyImproveConfig.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyImproveConfig.java
index a897c5f72..332a1fffb 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyImproveConfig.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushEfficiencyImproveConfig.java
@@ -22,6 +22,7 @@
import java.util.Set;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.builder.ToStringBuilder;
/**
* @author jiangcun.hlc@antfin.com
@@ -80,6 +81,9 @@ public class PushEfficiencyImproveConfig {
/** session 处理 pushTask delay pushTaskDebouncingMillis 时间处理,可以合并相同的推送任务,避免数据连续变化触发大量推送, 默认500ms */
private int sbfAppPushTaskDebouncingMillis = DEFAULT_PUSH_TASK_DEBOUNCING_MILLIS;
+ /** 自动优化的相关配置 */
+ private AutoPushEfficiencyConfig autoPushEfficiencyConfig = null;
+
/**
* 判断是否满足 三板斧灰度条件
*
@@ -247,10 +251,51 @@ public boolean isRegWorkWake() {
return regWorkWake;
}
+ /**
+ * Enable or disable "reg work" wake behavior for apps under SBF (push efficiency) control.
+ *
+ * When set to true, regWork tasks for apps that match SBF rules will use wake semantics; when false,
+ * they will not.
+ *
+ * @param regWorkWake true to enable regWork wake behavior for SBF apps, false to disable
+ */
public void setRegWorkWake(boolean regWorkWake) {
this.regWorkWake = regWorkWake;
}
+ /**
+ * Returns the configured AutoPushEfficiencyConfig controlling automatic push-efficiency behavior.
+ *
+ * @return the AutoPushEfficiencyConfig instance, or {@code null} if automatic push-efficiency is not configured
+ */
+ public AutoPushEfficiencyConfig getAutoPushEfficiencyConfig() {
+ return autoPushEfficiencyConfig;
+ }
+
+ /**
+ * Set the automatic push-efficiency configuration used by this instance.
+ *
+ * Passing null disables the automatic push-efficiency behavior for this configuration.
+ */
+ public void setAutoPushEfficiencyConfig(AutoPushEfficiencyConfig autoPushEfficiencyConfig) {
+ this.autoPushEfficiencyConfig = autoPushEfficiencyConfig;
+ }
+
+ /**
+ * Apply session server overrides to this config.
+ *
+ * If {@code sessionServerConfig} is non-null and its {@code sessionServerRegion} is non-blank,
+ * sets this instance's CURRENT_ZONE (uppercased) and replaces the default debounce values
+ * with the corresponding values from {@code sessionServerConfig}.
+ *
+ * Affected fields when applied:
+ * - CURRENT_ZONE (uppercased {@code sessionServerRegion})
+ * - DEFAULT_CHANGE_DEBOUNCING_MILLIS (from {@code getDataChangeDebouncingMillis()})
+ * - DEFAULT_CHANGE_DEBOUNCING_MAX_MILLIS (from {@code getDataChangeMaxDebouncingMillis()})
+ * - DEFAULT_PUSH_TASK_DEBOUNCING_MILLIS (from {@code getPushDataTaskDebouncingMillis()})
+ *
+ * @param sessionServerConfig configuration from the session server; ignored if null or its region is blank
+ */
public void setSessionServerConfig(SessionServerConfig sessionServerConfig) {
if (null != sessionServerConfig
&& StringUtils.isNotBlank(sessionServerConfig.getSessionServerRegion())) {
@@ -270,46 +315,15 @@ public boolean validate() {
return true;
}
+ /**
+ * Returns a string representation of this configuration.
+ *
+ * Produces a reflection-based representation that includes the values of this object's fields.
+ *
+ * @return a string containing the reflected field names and values for this instance
+ */
@Override
public String toString() {
- return "PushEfficiencyImproveConfig{"
- + "CURRENT_ZONE='"
- + CURRENT_ZONE
- + '\''
- + ", CURRENT_IP="
- + CURRENT_IP
- + ", inIpZoneSBF="
- + inIpZoneSBF()
- + ", DEFAULT_CHANGE_DEBOUNCING_MILLIS="
- + DEFAULT_CHANGE_DEBOUNCING_MILLIS
- + ", DEFAULT_CHANGE_DEBOUNCING_MAX_MILLIS="
- + DEFAULT_CHANGE_DEBOUNCING_MAX_MILLIS
- + ", DEFAULT_PUSH_TASK_DEBOUNCING_MILLIS="
- + DEFAULT_PUSH_TASK_DEBOUNCING_MILLIS
- + ", changeDebouncingMillis="
- + changeDebouncingMillis
- + ", changeDebouncingMaxMillis="
- + changeDebouncingMaxMillis
- + ", changeTaskWaitingMillis="
- + changeTaskWaitingMillis
- + ", pushTaskWaitingMillis="
- + pushTaskWaitingMillis
- + ", pushTaskDebouncingMillis="
- + pushTaskDebouncingMillis
- + ", regWorkWaitingMillis="
- + regWorkWaitingMillis
- + ", ipSet="
- + ipSet
- + ", zoneSet="
- + zoneSet
- + ", subAppSet="
- + subAppSet
- + ", sbfAppPushTaskDebouncingMillis="
- + sbfAppPushTaskDebouncingMillis
- + ", pushTaskWake="
- + pushTaskWake
- + ", regWorkWake="
- + regWorkWake
- + '}';
+ return ToStringBuilder.reflectionToString(this);
}
}
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java
index 6f4ac35a2..f8c348c83 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/push/PushProcessor.java
@@ -67,7 +67,9 @@ public class PushProcessor {
@Autowired protected ClientNodeService clientNodeService;
- @Autowired protected CircuitBreakerService circuitBreakerService;;
+ @Autowired protected CircuitBreakerService circuitBreakerService;
+
+ private volatile AutoPushEfficiencyRegulator autoPushEfficiencyRegulator;
private int pushDataTaskDebouncingMillis = 500;
private PushEfficiencyImproveConfig pushEfficiencyImproveConfig;
@@ -90,6 +92,13 @@ public void init() {
ConcurrentUtils.createDaemonThread("PushCleaner", cleaner).start();
}
+ /**
+ * Update push task timing from the provided configuration.
+ *
+ * Applies the config's push task waiting and debouncing millis to the internal task buffer and processor state.
+ *
+ * @param pushEfficiencyImproveConfig configuration supplying `pushTaskWaitingMillis` and `pushTaskDebouncingMillis`
+ */
public void setPushTaskDelayTime(PushEfficiencyImproveConfig pushEfficiencyImproveConfig) {
this.taskBuffer.setPushTaskWorkWaitingMillis(
pushEfficiencyImproveConfig.getPushTaskWaitingMillis());
@@ -97,6 +106,23 @@ public void setPushTaskDelayTime(PushEfficiencyImproveConfig pushEfficiencyImpro
this.pushEfficiencyImproveConfig = pushEfficiencyImproveConfig;
}
+ /**
+ * Injects an AutoPushEfficiencyRegulator to enable automatic push-efficiency accounting.
+ *
+ * The regulator, if non-null, will be consulted by push logic (doPush) to increment
+ * push counts for efficiency regulation. Passing null clears the regulator.
+ */
+ public void setAutoPushEfficiencyRegulator(
+ AutoPushEfficiencyRegulator autoPushEfficiencyRegulator) {
+ this.autoPushEfficiencyRegulator = autoPushEfficiencyRegulator;
+ }
+
+ /**
+ * Lazily initializes the PushTaskBuffer used to hold pending push tasks.
+ *
+ * If a buffer is not already present, creates a new PushTaskBuffer using the
+ * configured bucket size from sessionServerConfig.
+ */
void intTaskBuffer() {
if (this.taskBuffer == null) {
this.taskBuffer = new PushTaskBuffer(sessionServerConfig.getPushTaskBufferBucketSize());
@@ -307,6 +333,26 @@ protected boolean interruptOnPushEmpty(
return false;
}
+ /**
+ * Attempts to execute the given push task: validates push permission and task state, prepares push
+ * payload, and initiates a remote push with a callback.
+ *
+ * Returns true when the push was successfully initiated and recorded; returns false if the push
+ * was skipped (disabled by IP/data-center switch, a conflicting/obsolete in-flight task, the task
+ * determined it should not continue, it was interrupted due to an empty payload condition, the
+ * push-empty check decided to skip, or an exception occurred).
+ *
+ * Side effects when returning true:
+ * - starts the task push trace,
+ * - records the push in the in-flight pushingRecords map,
+ * - may increment the AutoPushEfficiencyRegulator push counter if one is configured,
+ * - invokes the client node service to perform the remote push and registers a callback,
+ * - increments the PUSH_CLIENT_ING_COUNTER.
+ *
+ * @param task the push task to execute
+ * @return true if the push was initiated and recorded; false if the push was skipped or failed to
+ * start
+ */
boolean doPush(PushTask task) {
if (!pushSwitchService.canIpPushMulti(
task.pushingTaskKey.addr.getAddress().getHostAddress(), task.datum.dataCenters())) {
@@ -340,6 +386,13 @@ boolean doPush(PushTask task) {
return false;
}
+ // 如果需要则进行推送计数,以便于可以自动化调整攒批
+ AutoPushEfficiencyRegulator currentAutoPushEfficiencyRegulator =
+ this.autoPushEfficiencyRegulator;
+ if (null != currentAutoPushEfficiencyRegulator) {
+ currentAutoPushEfficiencyRegulator.safeIncrementPushCount();
+ }
+
pushingRecords.put(
task.pushingTaskKey,
new PushRecord(
diff --git a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/ClientManagerResource.java b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/ClientManagerResource.java
index b9fde3322..7bc04da46 100644
--- a/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/ClientManagerResource.java
+++ b/server/server/session/src/main/java/com/alipay/sofa/registry/server/session/resource/ClientManagerResource.java
@@ -78,11 +78,17 @@ public class ClientManagerResource {
@Autowired protected ExecutorManager executorManager;
+ private volatile boolean enableTrafficOperate = true;
+
/**
- * Client off
+ * Turns off (disconnects or disables traffic for) client connections for the given IPs.
+ *
+ * Accepts a single string of one or more IP addresses (e.g., comma- or whitespace-separated) which will be parsed into a set of IPs. For each matching connection, the method looks up ConnectId values and instructs the session registry to "client off" those connections.
+ *
+ * If the input is empty the call fails. If traffic operations are globally disabled via the feature flag, the call returns a failure indicating rate limiting.
*
- * @param ips ips
- * @return CommonResponse
+ * @param ips a string containing one or more IP addresses to target (will be parsed into a set)
+ * @return a CommonResponse indicating success, or a failure with a message explaining the reason
*/
@POST
@Path("/clientOff")
@@ -90,6 +96,12 @@ public CommonResponse clientOff(@FormParam("ips") String ips) {
if (StringUtils.isEmpty(ips)) {
return CommonResponse.buildFailedResponse("ips is empty");
}
+
+ if (!this.enableTrafficOperate) {
+ // 限流,不允许操作开关流
+ return CommonResponse.buildFailedResponse("too many request");
+ }
+
final Set Validates the input and a feature-flag that gates traffic operations. If local enable succeeds,
+ * the method propagates the same "client open" request to other console servers in the same zone
+ * and returns a failure if any remote server reports failure.
*
- * @param ips ips
- * @return CommonResponse
+ * @param ips comma-separated list of client IP addresses to open
+ * @return a CommonResponse indicating success or failure (validation failure, traffic operations
+ * disabled, or any remote-server failure)
*/
@POST
@Path("/zone/clientOpen")
@@ -164,6 +204,12 @@ public CommonResponse clientOnInZone(@FormParam("ips") String ips) {
if (StringUtils.isEmpty(ips)) {
return CommonResponse.buildFailedResponse("ips is empty");
}
+
+ if (!this.enableTrafficOperate) {
+ // 限流,不允许操作开关流
+ return CommonResponse.buildFailedResponse("too many request");
+ }
+
CommonResponse resp = clientOn(ips);
if (!resp.isSuccess()) {
return resp;
@@ -235,7 +281,34 @@ public Map Validates the provided authentication token. If authentication succeeds, retrieves a map of
+ * change debouncing times from the ChangeProcessor and returns it in a successful GenericResponse.
+ * If authentication fails or an error occurs, returns a failed GenericResponse with an error
+ * message.
+ *
+ * @param token authentication token passed in the HTTP header
+ * @return a GenericResponse whose data (on success) is a Map from String to arrays of
+ * ChangeDebouncingTime; on failure the response contains an error message
+ */
+ @GET
+ @Path("/getChangeDebouncingMillis")
+ @Produces(MediaType.APPLICATION_JSON)
+ public GenericResponse