Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
da93168
Adds support for multiple managers running distributed fate
keith-turner Feb 19, 2026
952ddf3
remove done follow on comment
keith-turner Feb 20, 2026
3cee431
fix build issues
keith-turner Feb 20, 2026
a286363
add missing override
keith-turner Feb 20, 2026
ec19663
Merge branch 'main' into dist-fate2
keith-turner Feb 20, 2026
b25658e
remove hadoop import
keith-turner Feb 20, 2026
cef527e
fix build issues
keith-turner Feb 20, 2026
7740c3d
Merge branch 'main' into dist-fate2
keith-turner Feb 23, 2026
3671c58
remove unused import
keith-turner Feb 23, 2026
61dafcb
remove TODO
keith-turner Feb 23, 2026
ce34259
Merge branch 'main' into dist-fate2
keith-turner Feb 24, 2026
dfa9ec0
fix compile problems after merge
keith-turner Feb 24, 2026
4fb4f66
regenerate thrift
keith-turner Feb 24, 2026
c43e851
Merge remote-tracking branch 'upstream/main' into dist-fate2
keith-turner Feb 24, 2026
a34d2e0
restores highly available service
keith-turner Feb 25, 2026
e888a68
Merge branch 'main' into dist-fate2
keith-turner Feb 25, 2026
dee3183
Merge branch 'highly-avail-service' into dist-fate2
keith-turner Feb 25, 2026
a0e0f3b
Collapsed to one thrift server and one server context
keith-turner Feb 26, 2026
8be2acf
validate ha service throws expected exception
keith-turner Feb 26, 2026
d8e9c95
remove boolean
keith-turner Feb 26, 2026
70d3b32
fix javadoc
keith-turner Feb 26, 2026
39ce232
Merge branch 'highly-avail-service' into dist-fate2
keith-turner Feb 26, 2026
d5670e6
fix bug
keith-turner Feb 27, 2026
55cc62a
Reorganized thrift code
keith-turner Feb 27, 2026
ba44b3a
improve error message
keith-turner Feb 27, 2026
76cf38d
Merge branch 'highly-avail-service' into dist-fate2
keith-turner Feb 27, 2026
10bc681
Merge remote-tracking branch 'upstream/main' into dist-fate2
keith-turner Feb 28, 2026
1f8d0f5
remove unused code
keith-turner Feb 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class Constants {

public static final String ZMANAGERS = "/managers";
public static final String ZMANAGER_LOCK = ZMANAGERS + "/lock";
public static final String ZMANAGER_ASSISTANT_LOCK = ZMANAGERS + "/assistants";
public static final String ZMANAGER_GOAL_STATE = ZMANAGERS + "/goal_state";
public static final String ZMANAGER_TICK = ZMANAGERS + "/tick";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1292,7 +1292,7 @@ private static Set<String> createPersistentWatcherPaths() {
Constants.ZMANAGER_LOCK, Constants.ZMINI_LOCK, Constants.ZMONITOR_LOCK,
Constants.ZNAMESPACES, Constants.ZRECOVERY, Constants.ZSSERVERS, Constants.ZTABLES,
Constants.ZTSERVERS, Constants.ZUSERS, RootTable.ZROOT_TABLET, Constants.ZTEST_LOCK,
Constants.ZRESOURCEGROUPS)) {
Constants.ZMANAGER_ASSISTANT_LOCK, Constants.ZRESOURCEGROUPS)) {
pathsToWatch.add(path);
}
return pathsToWatch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,21 @@ public FateTxStore<T> reserve(FateId fateId) {
EnumSet.of(TStatus.SUBMITTED, TStatus.FAILED_IN_PROGRESS);

@Override
public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> idConsumer) {
public void runnable(Set<FatePartition> partitions, BooleanSupplier keepWaiting,
Consumer<FateIdStatus> idConsumer) {

if (partitions.isEmpty()) {
return;
}

AtomicLong seen = new AtomicLong(0);

while (keepWaiting.getAsBoolean() && seen.get() == 0) {
final long beforeCount = unreservedRunnableCount.getCount();
final boolean beforeDeferredOverflow = deferredOverflow.get();

try (Stream<FateIdStatus> inProgress = getTransactions(IN_PROGRESS_SET);
Stream<FateIdStatus> other = getTransactions(OTHER_RUNNABLE_SET)) {
try (Stream<FateIdStatus> inProgress = getTransactions(partitions, IN_PROGRESS_SET);
Stream<FateIdStatus> other = getTransactions(partitions, OTHER_RUNNABLE_SET)) {
// read the in progress transaction first and then everything else in order to process those
// first
var transactions = Stream.concat(inProgress, other);
Expand Down Expand Up @@ -200,6 +205,8 @@ public void runnable(BooleanSupplier keepWaiting, Consumer<FateIdStatus> idConsu
if (beforeCount == unreservedRunnableCount.getCount()) {
long waitTime = 5000;
synchronized (deferred) {
deferred.keySet().removeIf(
fateId -> partitions.stream().noneMatch(partition -> partition.contains(fateId)));
if (!deferred.isEmpty()) {
waitTime = deferred.values().stream()
.mapToLong(countDownTimer -> countDownTimer.timeLeft(TimeUnit.MILLISECONDS)).min()
Expand Down Expand Up @@ -240,9 +247,11 @@ public ReadOnlyFateTxStore<T> read(FateId fateId) {
}

@Override
public Map<FateId,FateReservation> getActiveReservations() {
return list().filter(entry -> entry.getFateReservation().isPresent()).collect(Collectors
.toMap(FateIdStatus::getFateId, entry -> entry.getFateReservation().orElseThrow()));
public Map<FateId,FateReservation> getActiveReservations(Set<FatePartition> partitions) {
try (var stream = getTransactions(partitions, EnumSet.allOf(TStatus.class))) {
return stream.filter(entry -> entry.getFateReservation().isPresent()).collect(Collectors
.toMap(FateIdStatus::getFateId, entry -> entry.getFateReservation().orElseThrow()));
}
}

protected boolean isRunnable(TStatus status) {
Expand Down Expand Up @@ -289,6 +298,9 @@ protected void verifyLock(ZooUtil.LockID lockID, FateId fateId) {

protected abstract Stream<FateIdStatus> getTransactions(EnumSet<TStatus> statuses);

protected abstract Stream<FateIdStatus> getTransactions(Set<FatePartition> partitions,
EnumSet<TStatus> statuses);

protected abstract TStatus _getStatus(FateId fateId);

protected abstract Optional<FateKey> getKey(FateId fateId);
Expand Down Expand Up @@ -418,7 +430,8 @@ public interface FateIdGenerator {
FateId newRandomId(FateInstanceType instanceType);
}

protected void seededTx() {
@Override
public void seeded() {
unreservedRunnableCount.increment();
}

Expand Down
48 changes: 45 additions & 3 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
Expand All @@ -51,6 +52,8 @@
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import com.google.gson.JsonParser;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand All @@ -76,6 +79,7 @@ public class Fate<T> extends FateClient<T> {
private final AtomicBoolean keepRunning = new AtomicBoolean(true);
// Visible for FlakyFate test object
protected final Set<FateExecutor<T>> fateExecutors = new HashSet<>();
private Set<FatePartition> currentPartitions = Set.of();

public enum TxInfo {
FATE_OP, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
Expand Down Expand Up @@ -208,8 +212,10 @@ public void run() {
fe -> fe.getFateOps().equals(fateOps) && fe.getName().equals(fateExecutorName))) {
log.debug("[{}] Adding FateExecutor for {} with {} threads", store.type(), fateOps,
poolSize);
fateExecutors.add(
new FateExecutor<>(Fate.this, environment, fateOps, poolSize, fateExecutorName));
var fateExecutor =
new FateExecutor<>(Fate.this, environment, fateOps, poolSize, fateExecutorName);
fateExecutors.add(fateExecutor);
fateExecutor.setPartitions(currentPartitions);
}
}
}
Expand All @@ -233,7 +239,11 @@ private class DeadReservationCleaner implements Runnable {
@Override
public void run() {
if (keepRunning.get()) {
store.deleteDeadReservations();
Set<FatePartition> partitions;
synchronized (fateExecutors) {
partitions = currentPartitions;
}
store.deleteDeadReservations(partitions);
}
}
}
Expand Down Expand Up @@ -369,6 +379,17 @@ public AtomicInteger getNeedMoreThreadsWarnCount() {
return needMoreThreadsWarnCount;
}

public void seeded(Set<FatePartition> partitions) {
synchronized (fateExecutors) {
if (Sets.intersection(currentPartitions, partitions).isEmpty()) {
return;
}
}

log.trace("Notified of seeding for {}", partitions);
store.seeded();
}

/**
* Initiates shutdown of background threads that run fate operations and cleanup fate data and
* optionally waits on them. Leaves the fate object in a state where it can still update and read
Expand Down Expand Up @@ -432,6 +453,27 @@ public void close() {
store.close();
}

public Set<FatePartition> getPartitions() {
synchronized (fateExecutors) {
return currentPartitions;
}
}

public Set<FatePartition> setPartitions(Set<FatePartition> partitions) {
Objects.requireNonNull(partitions);
Preconditions.checkArgument(
partitions.stream().allMatch(
fp -> fp.start().getType() == store.type() && fp.end().getType() == store.type()),
"type mismatch type:%s partitions:%s", store.type(), partitions);

synchronized (fateExecutors) {
var old = currentPartitions;
currentPartitions = Set.copyOf(partitions);
fateExecutors.forEach(fe -> fe.setPartitions(currentPartitions));
return old;
}
}

private boolean anyFateExecutorIsAlive() {
synchronized (fateExecutors) {
return fateExecutors.stream().anyMatch(FateExecutor::isAlive);
Expand Down
29 changes: 28 additions & 1 deletion core/src/main/java/org/apache/accumulo/core/fate/FateClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.time.Duration;
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Stream;

Expand All @@ -46,8 +49,11 @@ public class FateClient<T> {
private static final EnumSet<ReadOnlyFateStore.TStatus> FINISHED_STATES =
EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);

private AtomicReference<Consumer<FateId>> seedingConsumer = new AtomicReference<>(fid -> {});

public FateClient(FateStore<T> store, Function<Repo<T>,String> toLogStrFunc) {
this.store = FateLogger.wrap(store, toLogStrFunc, false);
;
}

// get a transaction id back to the requester before doing any work
Expand All @@ -56,7 +62,23 @@ public FateId startTransaction() {
}

public FateStore.Seeder<T> beginSeeding() {
return store.beginSeeding();
var seeder = store.beginSeeding();
return new FateStore.Seeder<T>() {
@Override
public CompletableFuture<Optional<FateId>> attemptToSeedTransaction(Fate.FateOperation fateOp,
FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
var cfuture = seeder.attemptToSeedTransaction(fateOp, fateKey, repo, autoCleanUp);
return cfuture.thenApply(optional -> {
optional.ifPresent(seedingConsumer.get());
return optional;
});
}

@Override
public void close() {
seeder.close();
}
};
}

public void seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo,
Expand All @@ -73,6 +95,7 @@ public void seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<T> re
boolean autoCleanUp, String goalMessage) {
Fate.log.info("[{}] Seeding {} {} {}", store.type(), fateOp, fateId, goalMessage);
store.seedTransaction(fateOp, fateId, repo, autoCleanUp);
seedingConsumer.get().accept(fateId);
}

// check on the transaction
Expand Down Expand Up @@ -176,4 +199,8 @@ public Exception getException(FateId fateId) {
public Stream<FateKey> list(FateKey.FateKeyType type) {
return store.list(type);
}

public void setSeedingConsumer(Consumer<FateId> seedingConsumer) {
this.seedingConsumer.set(seedingConsumer);
}
}
18 changes: 16 additions & 2 deletions core/src/main/java/org/apache/accumulo/core/fate/FateExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand All @@ -43,6 +44,8 @@
import java.util.concurrent.TransferQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BooleanSupplier;

import org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
import org.apache.accumulo.core.conf.Property;
Expand Down Expand Up @@ -81,6 +84,7 @@ public class FateExecutor<T> {
private final Set<Fate.FateOperation> fateOps;
private final ConcurrentLinkedQueue<Integer> idleCountHistory = new ConcurrentLinkedQueue<>();
private final FateExecutorMetrics<T> fateExecutorMetrics;
private final AtomicReference<Set<FatePartition>> partitions = new AtomicReference<>(Set.of());

public FateExecutor(Fate<T> fate, T environment, Set<Fate.FateOperation> fateOps, int poolSize,
String name) {
Expand Down Expand Up @@ -298,6 +302,11 @@ protected ConcurrentLinkedQueue<Integer> getIdleCountHistory() {
return idleCountHistory;
}

public void setPartitions(Set<FatePartition> partitions) {
Objects.requireNonNull(partitions);
this.partitions.set(Set.copyOf(partitions));
}

/**
* A single thread that finds transactions to work on and queues them up. Do not want each worker
* thread going to the store and looking for work as it would place more load on the store.
Expand All @@ -308,7 +317,12 @@ private class WorkFinder implements Runnable {
public void run() {
while (fate.getKeepRunning().get() && !isShutdown()) {
try {
fate.getStore().runnable(() -> fate.getKeepRunning().get(), fateIdStatus -> {
var localPartitions = partitions.get();
// if the set of partitions changes, we should stop looking for work w/ the old set of
// partitions
BooleanSupplier keepRunning =
() -> fate.getKeepRunning().get() && localPartitions == partitions.get();
fate.getStore().runnable(localPartitions, keepRunning, fateIdStatus -> {
// The FateId with the fate operation 'fateOp' is workable by this FateExecutor if
// 1) This FateExecutor is assigned to work on 'fateOp' ('fateOp' is in 'fateOps')
// 2) The transaction was cancelled while NEW. This is an edge case that needs to be
Expand All @@ -319,7 +333,7 @@ public void run() {
var fateOp = fateIdStatus.getFateOperation().orElse(null);
if ((fateOp != null && fateOps.contains(fateOp))
|| txCancelledWhileNew(status, fateOp)) {
while (fate.getKeepRunning().get() && !isShutdown()) {
while (keepRunning.getAsBoolean() && !isShutdown()) {
try {
// The reason for calling transfer instead of queueing is avoid rescanning the
// storage layer and adding the same thing over and over. For example if all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.accumulo.core.metrics.Metric;
import org.apache.accumulo.core.metrics.MetricsProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;

public class FateExecutorMetrics<T> implements MetricsProducer {
public class FateExecutorMetrics<T> {
private static final Logger log = LoggerFactory.getLogger(FateExecutorMetrics.class);
private final FateInstanceType type;
private final String poolName;
Expand All @@ -49,7 +48,6 @@ protected FateExecutorMetrics(FateInstanceType type, String poolName,
this.idleWorkerCount = idleWorkerCount;
}

@Override
public void registerMetrics(MeterRegistry registry) {
// noop if already registered or cleared
if (state == State.UNREGISTERED) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.fate;

import java.util.UUID;

import org.apache.accumulo.core.manager.thrift.TFatePartition;

public record FatePartition(FateId start, FateId end) {

public TFatePartition toThrift() {
return new TFatePartition(start.canonical(), end.canonical());
}

public static FatePartition from(TFatePartition tfp) {
return new FatePartition(FateId.from(tfp.start), FateId.from(tfp.stop));
}

private static final FatePartition ALL_USER =
new FatePartition(FateId.from(FateInstanceType.USER, new UUID(0, 0)),
FateId.from(FateInstanceType.USER, new UUID(-1, -1)));
private static final FatePartition ALL_META =
new FatePartition(FateId.from(FateInstanceType.META, new UUID(0, 0)),
FateId.from(FateInstanceType.META, new UUID(-1, -1)));

public static FatePartition all(FateInstanceType type) {
return switch (type) {
case META -> ALL_META;
case USER -> ALL_USER;
};
}

private static final UUID LAST_UUID = new UUID(-1, -1);

public boolean isEndInclusive() {
return end.getTxUUID().equals(LAST_UUID);
}

public boolean contains(FateId fateId) {
if (isEndInclusive()) {
return fateId.compareTo(start) >= 0 && fateId.compareTo(end) <= 0;
} else {
return fateId.compareTo(start) >= 0 && fateId.compareTo(end) < 0;
}

}
}
Loading
Loading