Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,13 @@
import com.github.rholder.retry.WaitStrategies;
import com.google.common.base.Predicate;
import java.lang.annotation.Annotation;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Resource;
import javax.ws.rs.Path;
import javax.ws.rs.ext.Provider;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.glassfish.jersey.server.ResourceConfig;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -233,15 +232,36 @@ private void renewNode() {
metaServerConfig.getSchedulerHeartbeatIntervalSecs() * 1000);
}

/**
* Opens and starts the Bolt session register server if it hasn't been started.
*
* <p>Merges the built-in sessionServerHandlers with any handlers returned by
* {@link #customSessionServerHandlers()}, opens a server bound to the local address and the
* configured session port via {@code boltExchange}, and stores the server instance in
* {@code sessionServer}. The method sets the internal started flag to prevent reinitialization.
*
* @throws RuntimeException if the server fails to open; the started flag is reset before
* rethrowing.
*/
private void openSessionRegisterServer() {
try {
if (rpcServerForSessionStarted.compareAndSet(false, true)) {
List<AbstractServerHandler> mergedSessionServerHandlers =
new ArrayList<>(this.sessionServerHandlers);

Collection<AbstractServerHandler> customSessionServerHandlers =
this.customSessionServerHandlers();
if (CollectionUtils.isNotEmpty(customSessionServerHandlers)) {
mergedSessionServerHandlers.addAll(this.customSessionServerHandlers());
}

sessionServer =
boltExchange.open(
new URL(
NetUtil.getLocalAddress().getHostAddress(),
metaServerConfig.getSessionServerPort()),
sessionServerHandlers.toArray(new ChannelHandler[sessionServerHandlers.size()]));
mergedSessionServerHandlers.toArray(
new ChannelHandler[mergedSessionServerHandlers.size()]));

LOGGER.info(
"Open session node register server port {} success!",
Expand All @@ -257,6 +277,26 @@ private void openSessionRegisterServer() {
}
}

/**
* Hook for subclasses to supply additional session server handlers.
*
* Subclasses may override to return extra AbstractServerHandler instances that will be merged
* with the bootstrap's default session handlers when opening the session register server.
*
* @return a non-null collection of additional session server handlers (default: empty list)
*/
protected Collection<AbstractServerHandler> customSessionServerHandlers() {
return Collections.emptyList();
}

/**
* Opens the Bolt data-register RPC server on the configured data port if it is not already started.
*
* <p>This method is idempotent: it uses an atomic compare-and-set to ensure the server is opened only once.
* On success, the created server is stored in the `dataServer` field and the started flag is set.
*
* @throws RuntimeException if the server fails to open; the underlying exception is wrapped.
*/
private void openDataRegisterServer() {
try {
if (rpcServerForDataStarted.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,25 @@ public MetricsResource metricsResource() {
return new MetricsResource();
}

/**
* Creates and exposes a RegistryCoreOpsResource Spring bean.
*
* @return a new {@link RegistryCoreOpsResource} instance managed by the Spring container
*/
@Bean
public RegistryCoreOpsResource registryCoreOpsResource() {
return new RegistryCoreOpsResource();
}

/**
* Creates and registers a DataCenterResource as a Spring bean.
*
* <p>Exposes a new instance of DataCenterResource for dependency injection in the application context.
*/
@Bean
public DataCenterResource dataCenterResource() {
return new DataCenterResource();
}
}

@Configuration
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.registry.server.meta.resource;

import com.alipay.sofa.registry.core.model.Result;
import com.alipay.sofa.registry.server.meta.bootstrap.config.MetaServerConfig;
import com.alipay.sofa.registry.server.meta.resource.filter.LeaderAwareRestController;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/**
* @author huicha
* @date 2025/9/22
*/
@Path("datacenter")
@LeaderAwareRestController
public class DataCenterResource {

private Logger LOGGER = LoggerFactory.getLogger(DataCenterResource.class);

@Autowired private MetaServerConfig metaServerConfig;

/**
* Returns the local data center identifier wrapped in a Result.
*
* <p>On success returns Result.success() with the local data center string in the result message.
* On failure returns a failed Result with message "Query meta local datacenter exception".
*
* @return a Result whose message contains the local data center on success, or a failed Result on error
*/
@GET
@Path("query")
@Produces(MediaType.APPLICATION_JSON)
public Result queryBlackList() {
try {
String localDataCenter = this.metaServerConfig.getLocalDataCenter();
Result result = Result.success();
result.setMessage(localDataCenter);
return result;
} catch (Throwable throwable) {
LOGGER.error("Query meta local datacenter exception", throwable);
return Result.failed("Query meta local datacenter exception");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,8 @@
import com.alipay.sofa.registry.util.StringFormatter;
import com.google.common.collect.Sets;
import java.util.Locale;
import javax.ws.rs.FormParam;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import java.util.Set;
import javax.ws.rs.*;
import javax.ws.rs.core.MediaType;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
Expand All @@ -51,6 +47,14 @@ public class MultiClusterSyncResource {

@Autowired private MultiClusterSyncRepository multiClusterSyncRepository;

/**
* Retrieves the MultiClusterSyncInfo for the given remote data center.
*
* If {@code remoteDataCenter} is blank, the response is a failed GenericResponse with an explanatory message.
*
* @param remoteDataCenter the remote data center identifier to query
* @return a GenericResponse containing the found MultiClusterSyncInfo on success, or a failed response when the input is blank or no info is found
*/
@GET
@Path("query")
@Produces(MediaType.APPLICATION_JSON)
Expand All @@ -66,6 +70,37 @@ public GenericResponse<MultiClusterSyncInfo> query(
return response;
}

/**
* Returns all local multi-cluster synchronization configurations.
*
* <p>Fetches all locally stored MultiClusterSyncInfo entries and returns them wrapped in a
* GenericResponse. If no entries exist, the response contains an empty set.
*
* @return a GenericResponse whose data is a Set of MultiClusterSyncInfo representing local sync
* configurations
*/
@GET
@Path("queryAll")
@Produces(MediaType.APPLICATION_JSON)
public GenericResponse<Set<MultiClusterSyncInfo>> queryAll() {
GenericResponse<Set<MultiClusterSyncInfo>> response = new GenericResponse();
Set<MultiClusterSyncInfo> queryResult = multiClusterSyncRepository.queryLocalSyncInfos();
response.fillSucceed(queryResult);
return response;
}

/**
* Create and persist a new MultiClusterSyncInfo for the given remote data center.
*
* The endpoint authenticates the request using the provided token, validates inputs,
* constructs a new MultiClusterSyncInfo (with sync enabled and push disabled by default),
* and inserts it into persistent storage.
*
* @param remoteDataCenter identifier of the remote data center to configure; must not be blank
* @param remoteMetaAddress meta server address of the remote data center; must not be blank
* @param token authentication token used for authorizing the operation
* @return a CommonResponse whose success flag is true when the new configuration was persisted
*/
@POST
@Path("/save")
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -470,6 +505,17 @@ public CommonResponse removeSyncGroup(
return response;
}

/**
* Remove an existing multi-cluster sync configuration for the given remote data center.
*
* <p>The removal is performed only if the provided expectVersion matches the stored
* configuration's data version and sync is currently disabled for that remote data center.
*
* @param remoteDataCenter the identifier of the remote data center to remove
* @param expectVersion the expected current data version (used for optimistic concurrency control)
* @param token authentication token required to authorize this operation
* @return a CommonResponse whose success flag is true when a configuration was removed, false otherwise
*/
@POST
@Path("/remove")
@Produces(MediaType.APPLICATION_JSON)
Expand Down Expand Up @@ -517,4 +563,128 @@ public CommonResponse removeConfig(
response.setSuccess(ret > 0);
return response;
}

/**
* Add comma-separated dataInfoIds to the ignore list for a remote data center.
*
* Validates the provided admin token, ensures inputs are non-empty and the supplied
* expectVersion matches the stored data version, then updates the MultiClusterSyncInfo
* by adding the given IDs to its ignoreDataInfoIds set, increments the data version,
* and persists the change.
*
* @param remoteDataCenter identifier of the remote data center whose config will be modified
* @param ignoreDataInfoIds comma-separated dataInfoId values to add to the ignore list
* @param token admin token used for authentication
* @param expectVersion expected current data version (used for optimistic concurrency)
* @return CommonResponse with success=true if the update was persisted, false otherwise
*/
@POST
@Path("/sync/ignoreDataInfoIds/add")
@Produces(MediaType.APPLICATION_JSON)
public CommonResponse addIgnoreSyncDataInfoIds(
@FormParam("remoteDataCenter") String remoteDataCenter,
@FormParam("ignoreDataInfoIds") String ignoreDataInfoIds,
@FormParam("token") String token,
@FormParam("expectVersion") String expectVersion) {
if (!AuthChecker.authCheck(token)) {
LOG.error(
"add ignoreDataInfoIds, remoteDataCenter={}, ignoreDataInfoIds={}, auth check={} fail!",
remoteDataCenter,
ignoreDataInfoIds,
token);
return GenericResponse.buildFailedResponse("auth check fail");
}

if (StringUtils.isBlank(remoteDataCenter)
|| StringUtils.isBlank(ignoreDataInfoIds)
|| StringUtils.isBlank(expectVersion)) {
return CommonResponse.buildFailedResponse(
"remoteDataCenter, ignoreDataInfoIds, expectVersion is not allow empty.");
}

MultiClusterSyncInfo exist = multiClusterSyncRepository.query(remoteDataCenter);

if (exist == null || exist.getDataVersion() != Long.parseLong(expectVersion)) {
return CommonResponse.buildFailedResponse(
StringFormatter.format(
"remoteDataCenter:{}, expectVersion:{} not exist.", remoteDataCenter, expectVersion));
}

exist.getIgnoreDataInfoIds().addAll(Sets.newHashSet(ignoreDataInfoIds.split(",")));
exist.setDataVersion(PersistenceDataBuilder.nextVersion());
boolean ret = multiClusterSyncRepository.update(exist, NumberUtils.toLong(expectVersion));

LOG.info(
"[addIgnoreSyncDataInfoIds]result:{}, remoteDataCenter:{}, ignoreDataInfoIds:{}, expectVersion:{}",
ret,
remoteDataCenter,
ignoreDataInfoIds,
expectVersion);

CommonResponse response = new CommonResponse();
response.setSuccess(ret);
return response;
}

/**
* Removes the given comma-separated ignore data-info IDs from the ignore list of the
* MultiClusterSyncInfo for the specified remote data center, increments the data version,
* and persists the change.
*
* Performs an authentication check, validates inputs and the expected data version before
* applying the removal. If authentication fails, inputs are invalid, or the expected version
* does not match the stored version, a failed CommonResponse is returned and no change is made.
*
* @param remoteDataCenter target remote data center identifier
* @param ignoreDataInfoIds comma-separated data-info IDs to remove from the ignore set
* @param token authentication token used for authorization
* @param expectVersion expected current data version (used for optimistic concurrency)
* @return a CommonResponse whose success flag is true when the update was persisted
*/
@POST
@Path("/sync/ignoreDataInfoIds/remove")
@Produces(MediaType.APPLICATION_JSON)
public CommonResponse removeIgnoreDataInfoIds(
@FormParam("remoteDataCenter") String remoteDataCenter,
@FormParam("ignoreDataInfoIds") String ignoreDataInfoIds,
@FormParam("token") String token,
@FormParam("expectVersion") String expectVersion) {
if (!AuthChecker.authCheck(token)) {
LOG.error(
"remove ignoreDataInfoIds, remoteDataCenter={}, ignoreDataInfoIds={}, auth check={} fail!",
remoteDataCenter,
ignoreDataInfoIds,
token);
return GenericResponse.buildFailedResponse("auth check fail");
}
if (StringUtils.isBlank(remoteDataCenter)
|| StringUtils.isBlank(ignoreDataInfoIds)
|| StringUtils.isBlank(expectVersion)) {
return CommonResponse.buildFailedResponse(
"remoteDataCenter, ignoreDataInfoIds, expectVersion is not allow empty.");
}

MultiClusterSyncInfo exist = multiClusterSyncRepository.query(remoteDataCenter);

if (exist == null || exist.getDataVersion() != Long.parseLong(expectVersion)) {
return CommonResponse.buildFailedResponse(
StringFormatter.format(
"remoteDataCenter:{}, expectVersion:{} not exist.", remoteDataCenter, expectVersion));
}

exist.getIgnoreDataInfoIds().removeAll(Sets.newHashSet(ignoreDataInfoIds.split(",")));
exist.setDataVersion(PersistenceDataBuilder.nextVersion());
boolean ret = multiClusterSyncRepository.update(exist, NumberUtils.toLong(expectVersion));

LOG.info(
"[removeIgnoreDataInfoIds]result:{}, remoteDataCenter:{}, ignoreDataInfoIds:{}, expectVersion:{}",
ret,
remoteDataCenter,
ignoreDataInfoIds,
expectVersion);

CommonResponse response = new CommonResponse();
response.setSuccess(ret);
return response;
}
}
Loading
Loading