Skip to content

Commit 36f577a

Browse files
authored
Merge branch 'master' into feature/table-summary-without-optimizing
2 parents 0d41056 + bb14041 commit 36f577a

36 files changed

Lines changed: 2164 additions & 306 deletions

amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import org.apache.amoro.config.Configurations;
3333
import org.apache.amoro.config.shade.utils.ConfigShadeUtils;
3434
import org.apache.amoro.exception.AmoroRuntimeException;
35+
import org.apache.amoro.process.ActionCoordinator;
36+
import org.apache.amoro.process.ProcessFactory;
3537
import org.apache.amoro.server.catalog.CatalogManager;
3638
import org.apache.amoro.server.catalog.DefaultCatalogManager;
3739
import org.apache.amoro.server.dashboard.DashboardServer;
@@ -47,16 +49,18 @@
4749
import org.apache.amoro.server.persistence.HttpSessionHandlerFactory;
4850
import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
4951
import org.apache.amoro.server.process.ProcessService;
52+
import org.apache.amoro.server.process.ProcessService.ExecuteEngineManager;
53+
import org.apache.amoro.server.process.TableProcessFactoryManager;
5054
import org.apache.amoro.server.resource.ContainerMetadata;
5155
import org.apache.amoro.server.resource.Containers;
5256
import org.apache.amoro.server.resource.DefaultOptimizerManager;
5357
import org.apache.amoro.server.resource.OptimizerManager;
5458
import org.apache.amoro.server.scheduler.inline.InlineTableExecutors;
5559
import org.apache.amoro.server.table.DefaultTableManager;
60+
import org.apache.amoro.server.table.DefaultTableRuntimeFactory;
5661
import org.apache.amoro.server.table.DefaultTableService;
5762
import org.apache.amoro.server.table.RuntimeHandlerChain;
5863
import org.apache.amoro.server.table.TableManager;
59-
import org.apache.amoro.server.table.TableRuntimeFactoryManager;
6064
import org.apache.amoro.server.table.TableService;
6165
import org.apache.amoro.server.terminal.TerminalManager;
6266
import org.apache.amoro.server.utils.ThriftServiceProxy;
@@ -167,7 +171,7 @@ public static void main(String[] args) {
167171
}
168172

169173
public void registAndElect() throws Exception {
170-
haContainer.registAndElect();
174+
haContainer.registerAndElect();
171175
}
172176

173177
public enum HAState {
@@ -229,17 +233,22 @@ public void transitionToFollower() {
229233
}
230234

231235
public void startOptimizingService() throws Exception {
232-
TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager();
233-
tableRuntimeFactoryManager.initialize();
236+
// Load process factories and build action coordinators from default table runtime factory.
237+
TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager();
238+
tableProcessFactoryManager.initialize();
239+
List<ProcessFactory> processFactories = tableProcessFactoryManager.installedPlugins();
234240

235-
tableService =
236-
new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager);
241+
DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory();
242+
defaultRuntimeFactory.initialize(processFactories);
237243

244+
List<ActionCoordinator> actionCoordinators = defaultRuntimeFactory.supportedCoordinators();
245+
ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
246+
247+
tableService = new DefaultTableService(serviceConfig, catalogManager, defaultRuntimeFactory);
248+
processService = new ProcessService(tableService, actionCoordinators, executeEngineManager);
238249
optimizingService =
239250
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
240251

241-
processService = new ProcessService(serviceConfig, tableService);
242-
243252
LOG.info("Setting up AMS table executors...");
244253
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
245254
addHandlerChain(optimizingService.getTableRuntimeHandler());

amoro-ams/src/main/java/org/apache/amoro/server/DefaultOptimizingService.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -490,16 +490,17 @@ public void run() {
490490
OptimizerKeepingTask keepingTask = suspendingQueue.take();
491491
String token = keepingTask.getToken();
492492
boolean isExpired = !keepingTask.tryKeeping();
493+
if (isExpired) {
494+
LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer());
495+
unregisterOptimizer(token);
496+
}
493497
Optional.ofNullable(keepingTask.getQueue())
494498
.ifPresent(
495499
queue ->
496500
queue
497501
.collectTasks(buildSuspendingPredication(authOptimizers.keySet()))
498502
.forEach(task -> retryTask(task, queue)));
499-
if (isExpired) {
500-
LOG.info("Optimizer {} has been expired, unregister it", keepingTask.getOptimizer());
501-
unregisterOptimizer(token);
502-
} else {
503+
if (!isExpired) {
503504
LOG.debug("Optimizer {} is being touched, keep it", keepingTask.getOptimizer());
504505
keepInTouch(keepingTask.getOptimizer());
505506
}

amoro-ams/src/main/java/org/apache/amoro/server/ha/DataBaseHighAvailabilityContainer.java

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

31+
import java.util.ArrayList;
32+
import java.util.List;
3133
import java.util.UUID;
3234
import java.util.concurrent.CountDownLatch;
3335
import java.util.concurrent.Executors;
@@ -135,6 +137,44 @@ public void waitFollowerShip() throws InterruptedException {
135137
LOG.info("Became the follower of AMS (Database lease)");
136138
}
137139

140+
@Override
141+
public void registerAndElect() throws Exception {
142+
boolean isMasterSlaveMode = serviceConfig.getBoolean(AmoroManagementConf.USE_MASTER_SLAVE_MODE);
143+
if (!isMasterSlaveMode) {
144+
LOG.debug("Master-slave mode is not enabled, skip node registration");
145+
return;
146+
}
147+
// In master-slave mode, register node to database by writing OPTIMIZING_SERVICE info
148+
// This is similar to ZK mode registering ephemeral nodes
149+
long now = System.currentTimeMillis();
150+
String optimizingInfoJson = JacksonUtil.toJSONString(optimizingServiceServerInfo);
151+
try {
152+
doAsIgnoreError(
153+
HaLeaseMapper.class,
154+
mapper -> {
155+
int updated =
156+
mapper.updateServerInfo(
157+
clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, optimizingInfoJson, now);
158+
if (updated == 0) {
159+
mapper.insertServerInfoIfAbsent(
160+
clusterName, OPTIMIZING_SERVICE, nodeId, nodeIp, optimizingInfoJson, now);
161+
}
162+
});
163+
LOG.info(
164+
"Registered AMS node to database: nodeId={}, optimizingService={}",
165+
nodeId,
166+
optimizingServiceServerInfo);
167+
} catch (Exception e) {
168+
LOG.error("Failed to register node to database", e);
169+
throw e;
170+
}
171+
}
172+
173+
@Override
174+
public boolean hasLeadership() {
175+
return isLeader.get();
176+
}
177+
138178
/** Closes the heartbeat executor safely. */
139179
@Override
140180
public void close() {
@@ -147,9 +187,6 @@ public void close() {
147187
}
148188
}
149189

150-
@Override
151-
public void registAndElect() throws Exception {}
152-
153190
private class HeartbeatRunnable implements Runnable {
154191
@Override
155192
public void run() {
@@ -304,6 +341,40 @@ private void onLeaderLost() {
304341
}
305342
}
306343

344+
@Override
345+
public List<AmsServerInfo> getAliveNodes() {
346+
List<AmsServerInfo> aliveNodes = new ArrayList<>();
347+
if (!isLeader.get()) {
348+
LOG.warn("Only leader node can get alive nodes list");
349+
return aliveNodes;
350+
}
351+
try {
352+
long currentTime = System.currentTimeMillis();
353+
List<HaLeaseMeta> leases =
354+
getAs(
355+
HaLeaseMapper.class,
356+
mapper -> mapper.selectLeasesByService(clusterName, OPTIMIZING_SERVICE));
357+
for (HaLeaseMeta lease : leases) {
358+
// Only include nodes with valid (non-expired) leases
359+
if (lease.getLeaseExpireTs() != null && lease.getLeaseExpireTs() > currentTime) {
360+
if (lease.getServerInfoJson() != null && !lease.getServerInfoJson().isEmpty()) {
361+
try {
362+
AmsServerInfo nodeInfo =
363+
JacksonUtil.parseObject(lease.getServerInfoJson(), AmsServerInfo.class);
364+
aliveNodes.add(nodeInfo);
365+
} catch (Exception e) {
366+
LOG.warn("Failed to parse server info for node {}", lease.getNodeId(), e);
367+
}
368+
}
369+
}
370+
}
371+
} catch (Exception e) {
372+
LOG.error("Failed to get alive nodes from database", e);
373+
throw e;
374+
}
375+
return aliveNodes;
376+
}
377+
307378
private AmsServerInfo buildServerInfo(String host, int thriftBindPort, int restBindPort) {
308379
AmsServerInfo amsServerInfo = new AmsServerInfo();
309380
amsServerInfo.setHost(host);

amoro-ams/src/main/java/org/apache/amoro/server/ha/HighAvailabilityContainer.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818

1919
package org.apache.amoro.server.ha;
2020

21+
import org.apache.amoro.client.AmsServerInfo;
22+
23+
import java.util.List;
24+
2125
/**
2226
* Common interface for high availability (HA) containers.
2327
*
@@ -49,5 +53,19 @@ public interface HighAvailabilityContainer {
4953
*
5054
* @throws Exception If registration fails or participation in the primary election fails.
5155
*/
52-
void registAndElect() throws Exception;
56+
void registerAndElect() throws Exception;
57+
58+
/**
59+
* Used in master-slave mode to obtain information about all currently registered AMS nodes.
60+
*
61+
* @return List<AmsServerInfo>
62+
*/
63+
List<AmsServerInfo> getAliveNodes();
64+
65+
/**
66+
* Used to determine whether the current AMS node is the primary node.
67+
*
68+
* @return true if the current AMS node is the primary node, false otherwise
69+
*/
70+
boolean hasLeadership();
5371
}

amoro-ams/src/main/java/org/apache/amoro/server/ha/NoopHighAvailabilityContainer.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818

1919
package org.apache.amoro.server.ha;
2020

21+
import org.apache.amoro.client.AmsServerInfo;
2122
import org.slf4j.Logger;
2223
import org.slf4j.LoggerFactory;
2324

25+
import java.util.List;
2426
import java.util.concurrent.CountDownLatch;
2527

2628
/** No-op HA container that never blocks and performs no leader election. */
@@ -48,5 +50,15 @@ public void close() {
4850
}
4951

5052
@Override
51-
public void registAndElect() throws Exception {}
53+
public void registerAndElect() throws Exception {}
54+
55+
@Override
56+
public List<AmsServerInfo> getAliveNodes() {
57+
return List.of();
58+
}
59+
60+
@Override
61+
public boolean hasLeadership() {
62+
return false;
63+
}
5264
}

0 commit comments

Comments
 (0)