Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.aspectj.lang.annotation.Aspect;
import tech.ydb.yoj.repository.db.Tx;
import tech.ydb.yoj.repository.db.TxManager;
import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException;
import tech.ydb.yoj.repository.db.exception.RetryableException;

/**
Expand Down Expand Up @@ -68,7 +69,7 @@ private Object doInTransaction(ProceedingJoinPoint pjp, YojTransactional transac

return localTx.tx(() -> safeCall(pjp));
}
} catch (CallRetryableException | CallException e) {
} catch (CallRetryableException | CallConditionallyRetryableException | CallException e) {
throw e.getCause();
}
}
Expand All @@ -88,17 +89,28 @@ Object safeCall(ProceedingJoinPoint pjp) {
return pjp.proceed();
} catch (RetryableException e) {
throw new CallRetryableException(e);
} catch (ConditionallyRetryableException e) {
throw new CallConditionallyRetryableException(e);
} catch (Throwable e) {
throw new CallException(e);
}
}

/**
* It's a hint for tx manager to retry was requested
* It's a hint for tx manager that an unconditional retry was requested
*/
static class CallRetryableException extends RetryableException {
CallRetryableException(RetryableException e) {
super(e.getMessage(), e.getCause());
super(e.getMessage(), e.getRetryPolicy(), e.getCause());
}
}

/**
* It's a hint for tx manager that a conditional retry was requested
*/
static class CallConditionallyRetryableException extends ConditionallyRetryableException {
CallConditionallyRetryableException(ConditionallyRetryableException e) {
super(e.getMessage(), e.getRetryPolicy(), e.getCause());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.yoj.repository.test.inmemory;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import lombok.Getter;
Expand All @@ -23,7 +24,10 @@
import java.util.function.Supplier;

public class InMemoryRepositoryTransaction implements BaseDb, RepositoryTransaction {
private final static AtomicLong txIdGenerator = new AtomicLong();
private static final String CLOSE_ACTION_COMMIT = "commit()";
private static final String CLOSE_ACTION_ROLLBACK = "rollback()";

private static final AtomicLong txIdGenerator = new AtomicLong();

private final long txId = txIdGenerator.incrementAndGet();
private final Stopwatch txStopwatch = Stopwatch.createStarted();
Expand Down Expand Up @@ -81,7 +85,12 @@ public void commit() {
if (isBadSession) {
throw new IllegalStateException("Transaction was invalidated. Commit isn't possible");
}
endTransaction("commit()", this::commitImpl);
endTransaction(CLOSE_ACTION_COMMIT, this::commitImpl);
}

@Override
public boolean wasCommitAttempted() {
return CLOSE_ACTION_COMMIT.equals(closeAction);
}

private void commitImpl() {
Expand All @@ -101,14 +110,15 @@ private void commitImpl() {

@Override
public void rollback() {
endTransaction("rollback()", this::rollbackImpl);
endTransaction(CLOSE_ACTION_ROLLBACK, this::rollbackImpl);
}

private void rollbackImpl() {
storage.rollback(txId);
}

private void endTransaction(String action, Runnable runnable) {
ensureTransactionActive();
try {
if (isFinalActionNeeded(action)) {
logTransaction(action, runnable);
Expand All @@ -134,6 +144,7 @@ private boolean isFinalActionNeeded(String action) {
final <T extends Entity<T>> void doInWriteTransaction(
String log, TableDescriptor<T> tableDescriptor, Consumer<WriteTxDataShard<T>> consumer
) {
ensureTransactionActive();
if (options.isScan()) {
throw new IllegalTransactionScanException("Mutable operations");
}
Expand All @@ -158,6 +169,7 @@ final <T extends Entity<T>> void doInWriteTransaction(
final <T extends Entity<T>, R> R doInTransaction(
String action, TableDescriptor<T> tableDescriptor, Function<ReadOnlyTxDataShard<T>, R> func
) {
ensureTransactionActive();
return logTransaction(action, () -> {
InMemoryTxLockWatcher findWatcher = hasWrites ? watcher : InMemoryTxLockWatcher.NO_LOCKS;
ReadOnlyTxDataShard<T> shard = storage.getReadOnlyTxDataShard(
Expand All @@ -180,10 +192,6 @@ private void logTransaction(String action, Runnable runnable) {
}

private <R> R logTransaction(String action, Supplier<R> supplier) {
if (closeAction != null) {
throw new IllegalStateException("Transaction already closed by " + closeAction);
}

Stopwatch sw = Stopwatch.createStarted();
try {
R result = supplier.get();
Expand All @@ -195,6 +203,10 @@ private <R> R logTransaction(String action, Supplier<R> supplier) {
}
}

private void ensureTransactionActive() {
Preconditions.checkState(closeAction == null, "Transaction already closed by %s", closeAction);
}

private String printResult(Object result) {
if (result instanceof Iterable<?>) {
long size = Iterables.size((Iterable<?>) result);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package tech.ydb.yoj.repository.ydb.exception;

import tech.ydb.yoj.repository.db.exception.RetryableException;
import tech.ydb.yoj.util.retry.RetryPolicy;

/**
* Tried to use a no longer active or valid YDB session, e.g. on a node that is now down.
*/
public class BadSessionException extends RetryableException {
public BadSessionException(String message) {
super(message);
public class BadSessionException extends YdbUnconditionallyRetryableException {
public BadSessionException(Enum<?> statusCode, Object request, Object response) {
super("Bad session", statusCode, request, response, RetryPolicy.retryImmediately());
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package tech.ydb.yoj.repository.ydb.exception;

/**
* Represents an unexpected condition for YOJ: unknown YDB request status, or some unexpected internal thing happening.
*/
public final class UnexpectedException extends YdbRepositoryException {
public UnexpectedException(Enum<?> statusCode, Object request, Object response) {
super("Unknown YDB status", statusCode, request, response);
}

public UnexpectedException(String message) {
super(message);
this(message, null);
}

public UnexpectedException(String message, Throwable cause) {
super(message, cause);
super(message, null, null, null, cause);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
package tech.ydb.yoj.repository.ydb.exception;

import tech.ydb.yoj.repository.db.exception.RepositoryException;
import tech.ydb.yoj.repository.db.exception.RetryableException;
import tech.ydb.yoj.repository.db.exception.UnavailableException;
import tech.ydb.yoj.util.lang.Strings;
import tech.ydb.yoj.util.retry.RetryPolicy;

/**
* One or more YDB components are not available, but the YDB API was still able to respond.
*/
public final class YdbComponentUnavailableException extends RetryableException {
public final class YdbComponentUnavailableException extends YdbUnconditionallyRetryableException {
private static final RetryPolicy UNAVAILABLE_RETRY_POLICY = RetryPolicy.fixed(100L, 0.2);

public YdbComponentUnavailableException(Object request, Object response) {
super(Strings.join("\n", request, response), UNAVAILABLE_RETRY_POLICY);
}

public YdbComponentUnavailableException(String message, Throwable t) {
super(message, UNAVAILABLE_RETRY_POLICY, t);
public YdbComponentUnavailableException(Enum<?> statusCode, Object request, Object response) {
super("Some database components are not available, but we still got a reply from the DB",
statusCode, request, response, UNAVAILABLE_RETRY_POLICY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package tech.ydb.yoj.repository.ydb.exception;

import tech.ydb.yoj.ExperimentalApi;
import tech.ydb.yoj.repository.db.exception.ConditionallyRetryableException;
import tech.ydb.yoj.util.retry.RetryPolicy;

import javax.annotation.Nullable;

/**
* A <em>conditionally-retryable</em> exception from the YDB database, the YDB Java SDK, or the GRPC client used
* by the YDB Java SDK.
*
* @see ConditionallyRetryableException Conditionally-retryable exceptions
*/
@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/165")
public class YdbConditionallyRetryableException extends ConditionallyRetryableException {
private static final RetryPolicy UNDETERMINED_BACKOFF = RetryPolicy.expBackoff(5L, 500L, 0.1, 2.0);

private final Enum<?> statusCode;

public YdbConditionallyRetryableException(Enum<?> statusCode, Object request, Object response) {
this("Indeterminate request state: it's not known if the request succeeded or failed",
statusCode, request, response, UNDETERMINED_BACKOFF);
}

public YdbConditionallyRetryableException(String message, Enum<?> statusCode, Object request, Object response, RetryPolicy retryPolicy) {
super(YdbRepositoryException.errorMessage(message, statusCode, request, response), retryPolicy);
this.statusCode = statusCode;
}

@Nullable
public Enum<?> getStatusCode() {
return statusCode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package tech.ydb.yoj.repository.ydb.exception;

/**
* Represents a non-retryable YDB error.
*/
public final class YdbInternalException extends YdbRepositoryException {
public YdbInternalException(Enum<?> statusCode, Object request, Object response) {
super("Bad YDB response status", statusCode, request, response);
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
package tech.ydb.yoj.repository.ydb.exception;

import tech.ydb.yoj.repository.db.exception.RepositoryException;
import tech.ydb.yoj.repository.db.exception.RetryableException;
import tech.ydb.yoj.repository.db.exception.UnavailableException;
import tech.ydb.yoj.util.lang.Strings;
import tech.ydb.yoj.util.retry.RetryPolicy;

/**
* YDB node is overloaded, but the YDB API was still able to respond.
*/
public final class YdbOverloadedException extends RetryableException {
public final class YdbOverloadedException extends YdbUnconditionallyRetryableException {
private static final RetryPolicy OVERLOADED_BACKOFF = RetryPolicy.expBackoff(100L, 1_000L, 0.1, 2.0);

public YdbOverloadedException(Object request, Object response) {
super(Strings.join("\n", request, response), OVERLOADED_BACKOFF);
public YdbOverloadedException(Enum<?> statusCode, Object request, Object response) {
super("Database overloaded", statusCode, request, response, OVERLOADED_BACKOFF);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package tech.ydb.yoj.repository.ydb.exception;

/**
* Query precondition failed.
*/
public final class YdbPreconditionFailedException extends YdbRepositoryException {
public YdbPreconditionFailedException(Object request, Object response) {
super("Query precondition failed", request, response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,45 @@
import tech.ydb.yoj.repository.db.exception.ImplementationSpecificRepositoryException;
import tech.ydb.yoj.util.lang.Strings;

import javax.annotation.Nullable;

/**
* Base class for non-retryable YDB-specific exceptions.
* A generic non-retryable exception from the YDB database, the YDB Java SDK, or the GRPC client used
* by the YDB Java SDK.
*/
// TODO: make abstract
@SuppressWarnings("checkstyle:LeftCurly")
public sealed class YdbRepositoryException
public abstract sealed class YdbRepositoryException
extends ImplementationSpecificRepositoryException
permits ResultTruncatedException, UnexpectedException
{
public YdbRepositoryException(Object request, Object response) {
this(null, request, response);
permits ResultTruncatedException, YdbPreconditionFailedException, YdbInternalException, UnexpectedException {
private final Enum<?> statusCode;

public YdbRepositoryException(Enum<?> statusCode, Object request, Object response) {
this(null, statusCode, request, response);
}

public YdbRepositoryException(String message, Object request, Object response) {
this(Strings.join("\n", message, request, response));
this(message, null, request, response);
}

public YdbRepositoryException(String message, Enum<?> statusCode, Object request, Object response) {
this(message, statusCode, request, response, null);
}

public YdbRepositoryException(String message, Enum<?> statusCode, Object request, Object response, Throwable cause) {
super(errorMessage(message, statusCode, request, response), cause);
this.statusCode = statusCode;
}

public YdbRepositoryException(String message) {
super(message);
/*package*/ static String errorMessage(
@Nullable String message,
@Nullable Enum<?> statusCode,
@Nullable Object request,
@Nullable Object response
) {
return Strings.join("\n", Strings.join(" | ", statusCode, message), request, response);
}

public YdbRepositoryException(String message, Throwable cause) {
super(message, cause);
@Nullable
public Enum<?> getStatusCode() {
return statusCode;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package tech.ydb.yoj.repository.ydb.exception;

import tech.ydb.yoj.repository.db.exception.RepositoryException;
import tech.ydb.yoj.repository.db.exception.UnavailableException;
import tech.ydb.yoj.util.retry.RetryPolicy;

/**
* Could not acquire session from session pool within the timeout.
*/
public final class YdbSessionNotAcquiredException extends YdbUnconditionallyRetryableException {
private static final RetryPolicy OVERLOADED_BACKOFF = RetryPolicy.expBackoff(100L, 1_000L, 0.1, 2.0);

public YdbSessionNotAcquiredException(Enum<?> statusCode, Object request, Object response) {
super("Timed out waiting to get a session from the pool", statusCode, request, response, OVERLOADED_BACKOFF);
}

@Override
public RepositoryException rethrow() {
return UnavailableException.afterRetries("Session not acquired, retries failed", this);
}
}
Loading
Loading