diff --git a/sdk/cosmos/azure-cosmos-tests/pom.xml b/sdk/cosmos/azure-cosmos-tests/pom.xml index fc7765209c4d..3131458840f7 100644 --- a/sdk/cosmos/azure-cosmos-tests/pom.xml +++ b/sdk/cosmos/azure-cosmos-tests/pom.xml @@ -906,10 +906,10 @@ Licensed under the MIT License. - - fault-injection-barrier + + multi-region-strong - fault-injection-barrier + multi-region-strong @@ -919,9 +919,14 @@ Licensed under the MIT License. 3.5.3 - src/test/resources/fault-injection-barrier-testng.xml + src/test/resources/multi-region-strong.xml - + + true + 1 + 256 + paranoid + diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/BailOutFromBarrierE2ETests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/BailOutFromBarrierE2ETests.java new file mode 100644 index 000000000000..92ced6c37405 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/BailOutFromBarrierE2ETests.java @@ -0,0 +1,543 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos; + +import com.azure.cosmos.implementation.AsyncDocumentClient; +import com.azure.cosmos.implementation.ClientSideRequestStatistics; +import com.azure.cosmos.implementation.DatabaseAccount; +import com.azure.cosmos.implementation.DatabaseAccountLocation; +import com.azure.cosmos.implementation.GlobalEndpointManager; +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.ImplementationBridgeHelpers; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.Strings; +import com.azure.cosmos.implementation.directconnectivity.ConsistencyReader; +import com.azure.cosmos.implementation.directconnectivity.ConsistencyWriter; +import com.azure.cosmos.implementation.directconnectivity.StoreResponseDiagnostics; +import com.azure.cosmos.implementation.directconnectivity.StoreResponseInterceptorUtils; +import com.azure.cosmos.implementation.directconnectivity.StoreResultDiagnostics; +import com.azure.cosmos.models.CosmosItemResponse; +import com.azure.cosmos.models.PartitionKey; +import com.azure.cosmos.rx.TestSuiteBase; +import com.azure.cosmos.test.implementation.interceptor.CosmosInterceptorHelper; +import org.testng.SkipException; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Factory; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +public class BailOutFromBarrierE2ETests extends TestSuiteBase { + + private static final ImplementationBridgeHelpers.CosmosAsyncClientHelper.CosmosAsyncClientAccessor cosmosAsyncClientAccessor + = ImplementationBridgeHelpers.CosmosAsyncClientHelper.getCosmosAsyncClientAccessor(); + + private volatile CosmosAsyncDatabase database; + private volatile CosmosAsyncContainer container; + private List preferredRegions; + private Map regionNameToEndpoint; + + @BeforeClass(groups = {"multi-region-strong"}) + public void beforeClass() { + + try (CosmosAsyncClient dummy = getClientBuilder().buildAsyncClient()) { + AsyncDocumentClient asyncDocumentClient = BridgeInternal.getContextClient(dummy); + GlobalEndpointManager globalEndpointManager = asyncDocumentClient.getGlobalEndpointManager(); + + DatabaseAccount databaseAccount = globalEndpointManager.getLatestDatabaseAccount(); + + AccountLevelLocationContext accountLevelContext = getAccountLevelLocationContext(databaseAccount, false); + + // Set preferred regions starting with secondary region + this.preferredRegions = new ArrayList<>(accountLevelContext.serviceOrderedReadableRegions); + if (this.preferredRegions.size() > 1) { + // Swap first and second to make secondary region preferred + String temp = this.preferredRegions.get(0); + this.preferredRegions.set(0, this.preferredRegions.get(1)); + this.preferredRegions.set(1, temp); + } + + this.regionNameToEndpoint = accountLevelContext.regionNameToEndpoint; + this.database = getSharedCosmosDatabase(dummy); + this.container = getSharedSinglePartitionCosmosContainer(dummy); + } + } + + @Factory(dataProvider = "clientBuildersWithDirectTcp") + public BailOutFromBarrierE2ETests(CosmosClientBuilder clientBuilder) { + super(clientBuilder); + } + + + /** + * Provides test scenarios for head request lease not found (410/1022) error handling. + * + *

Each scenario is defined by the following parameters:

+ *
    + *
  • headFailureCount: Number of Head requests that should fail with 410/1022 status codes
  • + *
  • operationTypeForWhichBarrierFlowIsTriggered: The operation type (Create or Read) that triggers the barrier flow
  • + *
  • enterPostQuorumSelectionOnlyBarrierLoop: Whether to enter the post quorum selection only barrier loop
  • + *
  • successfulHeadRequestCountWhichDontMeetBarrier: Number of successful Head requests (204 status code) + * that don't meet barrier requirements before failures start
  • + *
  • isCrossRegionRetryExpected: Whether a cross-region retry is expected for the scenario
  • + *
  • desiredClientLevelConsistency: The desired client-level consistency (STRONG or BOUNDED_STALENESS)
  • + *
+ * + *

Important Notes:

+ *
    + *
  • Scenarios are scoped to cases where document requests succeeded but barrier flows were triggered + * (excludes QuorumNotMet scenarios which scope all barriers to primary)
  • + *
  • Create operations tolerate up to 2 Head failures before failing with timeout
  • + *
  • Read operations tolerate 3-5 Head failures before requiring cross-region retry
  • + *
  • In the QuorumSelected phase, successful Head requests that don't meet barrier requirements are tolerated: + *
      + *
    • Up to 18 successful Head requests for BOUNDED_STALENESS consistency
    • + *
    • Up to 111 successful Head requests for STRONG consistency
    • + *
    + * (See QuorumReader - maxNumberOfReadBarrierReadRetries and maxBarrierRetriesForMultiRegion)
  • + *
+ * + * @return array of test scenario parameters for head request lease not found error handling + */ + @DataProvider(name = "headRequestLeaseNotFoundScenarios") + public static Object[][] headRequestLeaseNotFoundScenarios() { + + return new Object[][]{ + { 1, OperationType.Create, false, 0, false, null }, + { 2, OperationType.Create, false, 0, false, null }, + { 3, OperationType.Create, false, 0, false, null }, + { 4, OperationType.Create, false, 0, false, null }, + { 400, OperationType.Create, false, 0, false, null }, + { 1, OperationType.Read, false, 0, false, null }, + { 2, OperationType.Read, false, 0, false, null }, + { 3, OperationType.Read, false, 0, false, null }, + { 4, OperationType.Read, false, 0, true, null }, + { 400, OperationType.Read, false, 0, true, null }, + { 1, OperationType.Read, true, 18, false, ConsistencyLevel.BOUNDED_STALENESS }, + { 2, OperationType.Read, true, 18, false, ConsistencyLevel.BOUNDED_STALENESS }, + { 3, OperationType.Read, true, 18, false, ConsistencyLevel.BOUNDED_STALENESS }, + { 4, OperationType.Read, true, 18, false, ConsistencyLevel.BOUNDED_STALENESS }, + { 5, OperationType.Read, true, 18, true, ConsistencyLevel.BOUNDED_STALENESS }, + { 1, OperationType.Read, true, 18, false, ConsistencyLevel.STRONG }, + { 2, OperationType.Read, true, 18, false, ConsistencyLevel.STRONG }, + { 3, OperationType.Read, true, 18, false, ConsistencyLevel.STRONG }, + { 4, OperationType.Read, true, 18, true, ConsistencyLevel.STRONG }, + { 1, OperationType.Read, true, 108, false, ConsistencyLevel.STRONG }, + { 2, OperationType.Read, true, 108, false, ConsistencyLevel.STRONG }, + { 3, OperationType.Read, true, 108, false, ConsistencyLevel.STRONG }, + { 4, OperationType.Read, true, 108, false, ConsistencyLevel.STRONG }, + { 5, OperationType.Read, true, 108, true, ConsistencyLevel.STRONG } + }; + } + + /** + * Validates that the consistency layer properly handles lease not found (410/1022) errors during + * barrier head requests and implements appropriate bailout/retry strategies based on the failure count, + * operation type, and consistency level. + * + *

This test verifies the resilience and fault tolerance of the barrier flow mechanism in the + * consistency layer when head requests fail with lease not found errors. The barrier flow is triggered + * when documents requests succeed but additional head requests are needed to ensure consistency guarantees + * are met across replicas.

+ * + *

Test Scenarios:

+ *
    + *
  • Create Operations: Can tolerate up to 2 head failures before timing out. Beyond this threshold, + * the operation fails with a 408 timeout status code with 1022 substatus.
  • + *
  • Read Operations: Can tolerate 3-4 head failures within the same region. After 4 failures, + * the operation triggers a cross-region retry to ensure eventual consistency.
  • + *
  • Post-Quorum Selection Barrier Loop: Tests scenarios where initial barriers pass but subsequent + * barriers in the quorum-selected phase encounter failures. The system tolerates different numbers of + * successful head requests that don't meet barrier requirements: + *
      + *
    • Up to 18 for BOUNDED_STALENESS consistency
    • + *
    • Up to 111 for STRONG consistency
    • + *
    + *
  • + *
+ * + *

Validation Steps:

+ *
    + *
  1. Creates a CosmosAsyncClient with the specified consistency level and preferred regions
  2. + *
  3. Intercepts store responses to inject controlled head request failures (410/1022)
  4. + *
  5. Executes the specified operation (Create or Read) that triggers the barrier flow
  6. + *
  7. Validates the operation completes successfully or fails appropriately based on failure count
  8. + *
  9. Examines diagnostics to verify: + *
      + *
    • Correct number of head requests were attempted
    • + *
    • Expected number of regions were contacted
    • + *
    • Primary replica was contacted when failures occurred
    • + *
    • No 410 errors reached the Create/Read operations themselves
    • + *
    + *
  10. + *
+ * + *

Important Notes:

+ *
    + *
  • Test scenarios are scoped to cases where document requests succeeded but barrier flows were triggered, + * excluding QuorumNotMet scenarios which scope all barriers to the primary region
  • + *
  • The test uses an interceptor client to inject failures at precise points in the barrier flow
  • + *
  • Cross-region retry is only expected for Read operations exceeding the failure threshold
  • + *
  • The test validates that the system never exposes 410/1022 errors to the application layer for + * the primary Create/Read operations - only head requests should encounter these errors
  • + *
+ * + * @param headFailureCount the number of head requests that should fail with 410/1022 status codes + * before the system either succeeds, times out, or retries in another region + * @param operationTypeForWhichBarrierFlowIsTriggered the type of operation (Create or Read) that triggers + * the barrier flow and determines retry behavior + * @param enterPostQuorumSelectionOnlyBarrierLoop if true, allows initial barriers to pass and only injects + * failures during the post-quorum selection barrier loop phase + * @param successfulHeadRequestCountWhichDontMeetBarrier the number of successful head requests (204 status) + * that don't meet barrier requirements before failures + * start being injected (only relevant when in post-quorum + * selection barrier loop) + * @param isCrossRegionRetryExpected whether the test expects a cross-region retry to occur based on the + * failure count and operation type + * @param consistencyLevelApplicableForTest the consistency level to configure on the test client (STRONG or + * BOUNDED_STALENESS), which affects barrier retry thresholds + * + * @throws Exception if the test setup fails or unexpected errors occur during test execution + * + * @see ConsistencyReader for barrier flow implementation in read path + * @see ConsistencyWriter for barrier flow implementation in write path + * @see StoreResponseInterceptorUtils for failure injection mechanisms + */ + @Test(groups = {"multi-region-strong"}, dataProvider = "headRequestLeaseNotFoundScenarios", timeOut = 2 * TIMEOUT, retryAnalyzer = FlakyTestRetryAnalyzer.class) + public void validateHeadRequestLeaseNotFoundBailout( + int headFailureCount, + OperationType operationTypeForWhichBarrierFlowIsTriggered, + boolean enterPostQuorumSelectionOnlyBarrierLoop, + int successfulHeadRequestCountWhichDontMeetBarrier, + boolean isCrossRegionRetryExpected, + ConsistencyLevel consistencyLevelApplicableForTest) throws Exception { + + CosmosAsyncClient targetClient = getClientBuilder() + .preferredRegions(this.preferredRegions) + .buildAsyncClient(); + + ConsistencyLevel effectiveConsistencyLevel + = cosmosAsyncClientAccessor.getEffectiveConsistencyLevel(targetClient, operationTypeForWhichBarrierFlowIsTriggered, null); + + ConnectionMode connectionModeOfClientUnderTest + = ConnectionMode.valueOf( + cosmosAsyncClientAccessor.getConnectionMode( + targetClient).toUpperCase()); + + if (!shouldTestExecutionHappen( + effectiveConsistencyLevel, + OperationType.Create.equals(operationTypeForWhichBarrierFlowIsTriggered) ? ConsistencyLevel.STRONG : ConsistencyLevel.BOUNDED_STALENESS, + consistencyLevelApplicableForTest, + connectionModeOfClientUnderTest)) { + + safeClose(targetClient); + + throw new SkipException("Skipping test for arguments: " + + " OperationType: " + operationTypeForWhichBarrierFlowIsTriggered + + " ConsistencyLevel: " + effectiveConsistencyLevel + + " ConnectionMode: " + connectionModeOfClientUnderTest + + " DesiredConsistencyLevel: " + consistencyLevelApplicableForTest); + } + + AtomicInteger successfulHeadCountTracker = new AtomicInteger(); + AtomicInteger failedHeadCountTracker = new AtomicInteger(); + + try { + + TestObject testObject = TestObject.create(); + + if (enterPostQuorumSelectionOnlyBarrierLoop) { + if (OperationType.Read.equals(operationTypeForWhichBarrierFlowIsTriggered)) { + CosmosInterceptorHelper.registerTransportClientInterceptor(targetClient, StoreResponseInterceptorUtils.forceSuccessfulBarriersOnReadUntilQuorumSelectionThenForceBarrierFailures( + effectiveConsistencyLevel, + this.regionNameToEndpoint.get(this.preferredRegions.get(0)), + successfulHeadRequestCountWhichDontMeetBarrier, + successfulHeadCountTracker, + headFailureCount, + failedHeadCountTracker, + HttpConstants.StatusCodes.GONE, + HttpConstants.SubStatusCodes.LEASE_NOT_FOUND + )); + } + } else { + if (OperationType.Create.equals(operationTypeForWhichBarrierFlowIsTriggered)) { + CosmosInterceptorHelper.registerTransportClientInterceptor(targetClient, StoreResponseInterceptorUtils.forceBarrierFollowedByBarrierFailure( + effectiveConsistencyLevel, + this.regionNameToEndpoint.get(this.preferredRegions.get(1)), + headFailureCount, + failedHeadCountTracker, + HttpConstants.StatusCodes.GONE, + HttpConstants.SubStatusCodes.LEASE_NOT_FOUND + )); + } else if (OperationType.Read.equals(operationTypeForWhichBarrierFlowIsTriggered)) { + CosmosInterceptorHelper.registerTransportClientInterceptor(targetClient, StoreResponseInterceptorUtils.forceBarrierFollowedByBarrierFailure( + effectiveConsistencyLevel, + this.regionNameToEndpoint.get(this.preferredRegions.get(0)), + headFailureCount, + failedHeadCountTracker, + HttpConstants.StatusCodes.GONE, + HttpConstants.SubStatusCodes.LEASE_NOT_FOUND + )); + } + } + + try { + CosmosAsyncDatabase targetAsyncDatabase = targetClient.getDatabase(this.database.getId()); + CosmosAsyncContainer targetContainer = targetAsyncDatabase.getContainer(this.container.getId()); + + Thread.sleep(5000); // Wait for collection to be available to be read + + // Assert based on operation type and failure count + if (operationTypeForWhichBarrierFlowIsTriggered == OperationType.Create) { + + // Perform the operation + CosmosItemResponse response = targetContainer.createItem(testObject).block(); + + // For Create, Head can only fail up to 2 times before Create fails with a timeout + if (headFailureCount <= 1) { + // Should eventually succeed + assertThat(response).isNotNull(); + assertThat(response.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.CREATED); + + // Check diagnostics - should have contacted only one region for create + CosmosDiagnostics diagnostics = response.getDiagnostics(); + assertThat(diagnostics).isNotNull(); + + validateHeadRequestsInCosmosDiagnostics(diagnostics, 2, (2 + successfulHeadRequestCountWhichDontMeetBarrier)); + validateContactedRegions(diagnostics, 1); + } else { + // Should timeout with 408 + fail("Should have thrown timeout exception"); + } + } else { + + targetContainer.createItem(testObject).block(); + + CosmosItemResponse response + = targetContainer.readItem(testObject.getId(), new PartitionKey(testObject.getMypk()), TestObject.class).block(); + + // for Read, Head can fail up to 3 times and still succeed from the same region after which read has to go to another region + if (!isCrossRegionRetryExpected) { + // Should eventually succeed + assertThat(response).isNotNull(); + assertThat(response.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.OK); + + // Check diagnostics - should have contacted only one region for create + CosmosDiagnostics diagnostics = response.getDiagnostics(); + assertThat(diagnostics).isNotNull(); + + validateHeadRequestsInCosmosDiagnostics(diagnostics, 5, (5 + successfulHeadRequestCountWhichDontMeetBarrier)); + validateContactedRegions(diagnostics, 1); + } else { + // Should eventually succeed + assertThat(response).isNotNull(); + assertThat(response.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.OK); + + CosmosDiagnostics diagnostics = response.getDiagnostics(); + assertThat(diagnostics).isNotNull(); + + validateHeadRequestsInCosmosDiagnostics(diagnostics, 5, (5 + successfulHeadRequestCountWhichDontMeetBarrier)); + validateContactedRegions(diagnostics, 2); + } + } + + } catch (CosmosException e) { + // Expected for some scenarios + if (operationTypeForWhichBarrierFlowIsTriggered == OperationType.Create) { + + if (headFailureCount <= 1) { + fail("Should have succeeded for create with head failures less than or equal to 2"); + } else { + // Should get 408-1022 timeout + assertThat(e.getStatusCode()).isEqualTo(HttpConstants.StatusCodes.REQUEST_TIMEOUT); + assertThat(e.getSubStatusCode()).isEqualTo(HttpConstants.SubStatusCodes.LEASE_NOT_FOUND); + + CosmosDiagnostics diagnostics = e.getDiagnostics(); + + validateHeadRequestsInCosmosDiagnostics(diagnostics, 2, (2 + successfulHeadRequestCountWhichDontMeetBarrier)); + validateContactedRegions(diagnostics, 1); + } + } + + if (operationTypeForWhichBarrierFlowIsTriggered == OperationType.Read) { + fail("Read operation should have succeeded even with head failures through cross region retry."); + } + } + + } finally { + safeClose(targetClient); + } + } + + private void validateContactedRegions(CosmosDiagnostics diagnostics, int expectedRegionsContactedCount) { + + CosmosDiagnosticsContext cosmosDiagnosticsContext = diagnostics.getDiagnosticsContext(); + Collection clientSideRequestStatisticsCollection + = diagnostics.getClientSideRequestStatistics(); + + for (ClientSideRequestStatistics clientSideRequestStatistics : clientSideRequestStatisticsCollection) { + + Collection storeResponseDiagnosticsList + = clientSideRequestStatistics.getResponseStatisticsList(); + + for (ClientSideRequestStatistics.StoreResponseStatistics storeResponseStatistics : storeResponseDiagnosticsList) { + if (storeResponseStatistics.getRequestOperationType() == OperationType.Create || storeResponseStatistics.getRequestOperationType() == OperationType.Read) { + + if (storeResponseStatistics.getStoreResult().getStoreResponseDiagnostics().getStatusCode() == 410) { + fail("Should not have encountered 410 for Create/Read operation"); + } + } + } + } + + assertThat(cosmosDiagnosticsContext).isNotNull(); + assertThat(cosmosDiagnosticsContext.getContactedRegionNames()).isNotNull(); + assertThat(cosmosDiagnosticsContext.getContactedRegionNames()).isNotEmpty(); + assertThat(cosmosDiagnosticsContext.getContactedRegionNames().size()).as("Mismatch in regions contacted.").isEqualTo(expectedRegionsContactedCount); + } + + private void validateHeadRequestsInCosmosDiagnostics( + CosmosDiagnostics diagnostics, + int maxExpectedHeadRequestCountWithLeaseNotFoundErrors, + int maxExpectedHeadRequestCount) { + + int actualHeadRequestCountWithLeaseNotFoundErrors = 0; + int actualHeadRequestCount = 0; + boolean primaryReplicaContacted = false; + + Collection clientSideRequestStatisticsCollection + = diagnostics.getClientSideRequestStatistics(); + + for (ClientSideRequestStatistics clientSideRequestStatistics : clientSideRequestStatisticsCollection) { + + Collection storeResponseDiagnosticsList + = clientSideRequestStatistics.getResponseStatisticsList(); + + for (ClientSideRequestStatistics.StoreResponseStatistics storeResponseStatistics : storeResponseDiagnosticsList) { + if (storeResponseStatistics.getRequestOperationType() == OperationType.Create || storeResponseStatistics.getRequestOperationType() == OperationType.Read) { + + if (storeResponseStatistics.getStoreResult().getStoreResponseDiagnostics().getStatusCode() == 410) { + fail("Should not have encountered 410 for Create/Read operation"); + } + } + } + + Collection supplementalResponseStatisticsList + = clientSideRequestStatistics.getSupplementalResponseStatisticsList(); + + for (ClientSideRequestStatistics.StoreResponseStatistics storeResponseStatistics : supplementalResponseStatisticsList) { + if (storeResponseStatistics.getRequestOperationType() == OperationType.Head) { + + StoreResultDiagnostics storeResultDiagnostics = storeResponseStatistics.getStoreResult(); + + assertThat(storeResultDiagnostics).isNotNull(); + + String storePhysicalAddressContacted + = storeResultDiagnostics.getStorePhysicalAddressAsString(); + + StoreResponseDiagnostics storeResponseDiagnostics + = storeResultDiagnostics.getStoreResponseDiagnostics(); + + assertThat(storeResponseDiagnostics).isNotNull(); + + actualHeadRequestCount++; + + if (storeResponseDiagnostics.getStatusCode() == HttpConstants.StatusCodes.GONE && storeResponseDiagnostics.getSubStatusCode() == HttpConstants.SubStatusCodes.LEASE_NOT_FOUND) { + actualHeadRequestCountWithLeaseNotFoundErrors++; + } + + if (isStorePhysicalAddressThatOfPrimaryReplica(storePhysicalAddressContacted)) { + primaryReplicaContacted = true; + } + } + } + } + + assertThat(actualHeadRequestCountWithLeaseNotFoundErrors).as("Head request failed count with 410/1022 should be greater than 1.").isGreaterThan(0); + assertThat(actualHeadRequestCountWithLeaseNotFoundErrors).as("Too many head request failed.").isLessThanOrEqualTo(maxExpectedHeadRequestCountWithLeaseNotFoundErrors); + assertThat(actualHeadRequestCount).as("Too many Head requests made perhaps due to real replication lag.").isLessThanOrEqualTo(maxExpectedHeadRequestCount + 10); + assertThat(primaryReplicaContacted).as("Primary replica should be contacted even when a single Head request sees a 410/1022").isTrue(); + } + + private AccountLevelLocationContext getAccountLevelLocationContext(DatabaseAccount databaseAccount, boolean writeOnly) { + Iterator locationIterator = + writeOnly ? databaseAccount.getWritableLocations().iterator() : databaseAccount.getReadableLocations().iterator(); + + List serviceOrderedReadableRegions = new ArrayList<>(); + List serviceOrderedWriteableRegions = new ArrayList<>(); + Map regionMap = new ConcurrentHashMap<>(); + + while (locationIterator.hasNext()) { + DatabaseAccountLocation accountLocation = locationIterator.next(); + regionMap.put(accountLocation.getName(), accountLocation.getEndpoint()); + + if (writeOnly) { + serviceOrderedWriteableRegions.add(accountLocation.getName()); + } else { + serviceOrderedReadableRegions.add(accountLocation.getName()); + } + } + + return new AccountLevelLocationContext( + serviceOrderedReadableRegions, + serviceOrderedWriteableRegions, + regionMap); + } + + private boolean shouldTestExecutionHappen( + ConsistencyLevel effectiveConsistencyLevel, + ConsistencyLevel minimumConsistencyLevel, + ConsistencyLevel consistencyLevelApplicableForTestScenario, + ConnectionMode connectionModeOfClientUnderTest) { + + if (!connectionModeOfClientUnderTest.name().equalsIgnoreCase(ConnectionMode.DIRECT.name())) { + return false; + } + + if (consistencyLevelApplicableForTestScenario != null) { + return consistencyLevelApplicableForTestScenario.equals(effectiveConsistencyLevel); + } + + return effectiveConsistencyLevel.compareTo(minimumConsistencyLevel) <= 0; + } + + private static boolean isStorePhysicalAddressThatOfPrimaryReplica(String storePhysicalAddress) { + + if (Strings.isNullOrEmpty(storePhysicalAddress)) { + return false; + } + + return storePhysicalAddress.endsWith("p/"); + } + + private static class AccountLevelLocationContext { + private final List serviceOrderedReadableRegions; + private final List serviceOrderedWriteableRegions; + private final Map regionNameToEndpoint; + + public AccountLevelLocationContext( + List serviceOrderedReadableRegions, + List serviceOrderedWriteableRegions, + Map regionNameToEndpoint) { + + this.serviceOrderedReadableRegions = serviceOrderedReadableRegions; + this.serviceOrderedWriteableRegions = serviceOrderedWriteableRegions; + this.regionNameToEndpoint = regionNameToEndpoint; + } + } + + @AfterClass(groups = {"multi-region-strong"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + public void afterClass() {} +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java index 58998a9ab6e4..08e1d7332eab 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/faultinjection/FaultInjectionServerErrorRuleOnDirectTests.java @@ -84,7 +84,7 @@ public FaultInjectionServerErrorRuleOnDirectTests(CosmosClientBuilder clientBuil this.subscriberValidationTimeout = TIMEOUT; } - @BeforeClass(groups = {"multi-region", "long", "fast", "fi-multi-master", "fault-injection-barrier"}, timeOut = TIMEOUT) + @BeforeClass(groups = {"multi-region", "long", "fast", "fi-multi-master", "multi-region-strong"}, timeOut = TIMEOUT) public void beforeClass() { clientWithoutPreferredRegions = getClientBuilder() .preferredRegions(new ArrayList<>()) @@ -1057,7 +1057,7 @@ public void faultInjectionServerErrorRuleTests_HitLimit() throws JsonProcessingE } } - @AfterClass(groups = {"multi-region", "long", "fast", "fi-multi-master", "fault-injection-barrier"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) + @AfterClass(groups = {"multi-region", "long", "fast", "fi-multi-master", "multi-region-strong"}, timeOut = SHUTDOWN_TIMEOUT, alwaysRun = true) public void afterClass() { safeClose(clientWithoutPreferredRegions); } @@ -1475,7 +1475,7 @@ public void faultInjectionInjectTcpResponseDelay() throws JsonProcessingExceptio } } - @Test(groups = {"fault-injection-barrier"}, dataProvider = "barrierRequestServerErrorResponseProvider", timeOut = 2 * TIMEOUT) + @Test(groups = {"multi-region-strong"}, dataProvider = "barrierRequestServerErrorResponseProvider", timeOut = 2 * TIMEOUT) public void faultInjection_serverError_barrierRequest( OperationType operationType, FaultInjectionServerErrorType serverErrorType, diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java index cf1110aec87f..fe01de341bc6 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/ReflectionUtils.java @@ -309,6 +309,10 @@ public static StoreReader getStoreReader(ConsistencyReader consistencyReader) { return get(StoreReader.class, consistencyReader, "storeReader"); } + public static StoreReader getStoreReader(ConsistencyWriter consistencyWriter) { + return get(StoreReader.class, consistencyWriter, "storeReader"); + } + public static void setStoreReader(ConsistencyReader consistencyReader, StoreReader storeReader) { set(consistencyReader, storeReader, "storeReader"); } diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseInterceptorUtils.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseInterceptorUtils.java new file mode 100644 index 000000000000..bdf0fee5cf86 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/implementation/directconnectivity/StoreResponseInterceptorUtils.java @@ -0,0 +1,149 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.directconnectivity; + +import com.azure.cosmos.ConsistencyLevel; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.Utils; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; + +public class StoreResponseInterceptorUtils { + + public static BiFunction forceBarrierFollowedByBarrierFailure( + ConsistencyLevel operationConsistencyLevel, + String regionName, + int maxAllowedFailureCount, + AtomicInteger failureCount, + int statusCode, + int subStatusCode) { + + return (request, storeResponse) -> { + + if (OperationType.Create.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { + + long localLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN)); + long manipulatedGCLSN = localLsn - 1; + + storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGCLSN)); + + return storeResponse; + } + + if (OperationType.Read.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { + + if (ConsistencyLevel.STRONG.equals(operationConsistencyLevel)) { + + long globalLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LSN)); + long itemLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.ITEM_LSN)); + long manipulatedGlobalCommittedLSN = Math.min(globalLsn, itemLsn) - 1; + + storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGlobalCommittedLSN)); + + return storeResponse; + } else if (ConsistencyLevel.BOUNDED_STALENESS.equals(operationConsistencyLevel)) { + + long manipulatedItemLSN = -1; + long manipulatedGlobalLSN = 0; + + storeResponse.setHeaderValue(WFConstants.BackendHeaders.LSN, String.valueOf(manipulatedGlobalLSN)); + storeResponse.setHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN, String.valueOf(manipulatedGlobalLSN)); + storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LSN, String.valueOf(manipulatedItemLSN)); + storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, String.valueOf(manipulatedItemLSN)); + + return storeResponse; + } + + return storeResponse; + } + + if (OperationType.Head.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { + if (failureCount.incrementAndGet() <= maxAllowedFailureCount) { + throw Utils.createCosmosException(statusCode, subStatusCode, new Exception("An intercepted exception occurred. Check status and substatus code for details."), null); + } + } + + return storeResponse; + }; + } + + public static BiFunction forceSuccessfulBarriersOnReadUntilQuorumSelectionThenForceBarrierFailures( + ConsistencyLevel operationConsistencyLevel, + String regionName, + int allowedSuccessfulHeadRequestsWithoutBarrierBeingMet, + AtomicInteger successfulHeadRequestCount, + int maxAllowedFailureCount, + AtomicInteger failureCount, + int statusCode, + int subStatusCode) { + + return (request, storeResponse) -> { + + if (OperationType.Read.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { + + if (ConsistencyLevel.STRONG.equals(operationConsistencyLevel)) { + + long globalLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LSN)); + long itemLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.ITEM_LSN)); + long manipulatedGlobalCommittedLSN = Math.min(globalLsn, itemLsn) - 1; + + storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGlobalCommittedLSN)); + + return storeResponse; + } else if (ConsistencyLevel.BOUNDED_STALENESS.equals(operationConsistencyLevel)) { + + long manipulatedItemLSN = -1; + long manipulatedGlobalLSN = 0; + + storeResponse.setHeaderValue(WFConstants.BackendHeaders.LSN, String.valueOf(manipulatedGlobalLSN)); + storeResponse.setHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN, String.valueOf(manipulatedGlobalLSN)); + storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LSN, String.valueOf(manipulatedItemLSN)); + storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, String.valueOf(manipulatedItemLSN)); + + return storeResponse; + } + + return storeResponse; + } + + if (OperationType.Head.equals(request.getOperationType()) && regionName.equals(request.requestContext.regionalRoutingContextToRoute.getGatewayRegionalEndpoint().toString())) { + + if (successfulHeadRequestCount.incrementAndGet() <= allowedSuccessfulHeadRequestsWithoutBarrierBeingMet) { + + if (ConsistencyLevel.STRONG.equals(operationConsistencyLevel)) { + + long localLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN)); + long itemLsn = Long.parseLong(storeResponse.getHeaderValue(WFConstants.BackendHeaders.ITEM_LSN)); + long manipulatedGCLSN = Math.min(localLsn, itemLsn) - 1; + + storeResponse.setHeaderValue(WFConstants.BackendHeaders.GLOBAL_COMMITTED_LSN, String.valueOf(manipulatedGCLSN)); + + return storeResponse; + } else if (ConsistencyLevel.BOUNDED_STALENESS.equals(operationConsistencyLevel)) { + + long manipulatedItemLSN = -1; + long manipulatedGlobalLSN = -1; + + storeResponse.setHeaderValue(WFConstants.BackendHeaders.LSN, String.valueOf(manipulatedGlobalLSN)); + storeResponse.setHeaderValue(WFConstants.BackendHeaders.LOCAL_LSN, String.valueOf(manipulatedGlobalLSN)); + storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LSN, String.valueOf(manipulatedItemLSN)); + storeResponse.setHeaderValue(WFConstants.BackendHeaders.ITEM_LOCAL_LSN, String.valueOf(manipulatedItemLSN)); + + return storeResponse; + } + + return storeResponse; + } + + if (failureCount.incrementAndGet() <= maxAllowedFailureCount) { + throw Utils.createCosmosException(statusCode, subStatusCode, new Exception("An intercepted exception occurred. Check status and substatus code for details."), null); + } + } + + return storeResponse; + }; + } +} diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java index e4118eedc747..d4e06ca7407b 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java +++ b/sdk/cosmos/azure-cosmos-tests/src/test/java/com/azure/cosmos/rx/TestSuiteBase.java @@ -205,7 +205,7 @@ public CosmosAsyncDatabase getDatabase(String id) { @BeforeSuite(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master", "emulator", "emulator-vnext", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", - "circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "fault-injection-barrier"}, timeOut = SUITE_SETUP_TIMEOUT) + "circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong"}, timeOut = SUITE_SETUP_TIMEOUT) public void beforeSuite() { logger.info("beforeSuite Started"); @@ -223,7 +223,7 @@ public void beforeSuite() { @AfterSuite(groups = {"thinclient", "fast", "long", "direct", "multi-region", "multi-master", "flaky-multi-master", "emulator", "split", "query", "cfp-split", "circuit-breaker-misc-gateway", "circuit-breaker-misc-direct", - "circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "fault-injection-barrier"}, timeOut = SUITE_SHUTDOWN_TIMEOUT) + "circuit-breaker-read-all-read-many", "fi-multi-master", "long-emulator", "fi-thinclient-multi-region", "fi-thinclient-multi-master", "multi-region-strong"}, timeOut = SUITE_SHUTDOWN_TIMEOUT) public void afterSuite() { logger.info("afterSuite Started"); diff --git a/sdk/cosmos/azure-cosmos-tests/src/test/resources/fault-injection-barrier-testng.xml b/sdk/cosmos/azure-cosmos-tests/src/test/resources/multi-region-strong.xml similarity index 90% rename from sdk/cosmos/azure-cosmos-tests/src/test/resources/fault-injection-barrier-testng.xml rename to sdk/cosmos/azure-cosmos-tests/src/test/resources/multi-region-strong.xml index 6444f7176010..9b8607cbf971 100644 --- a/sdk/cosmos/azure-cosmos-tests/src/test/resources/fault-injection-barrier-testng.xml +++ b/sdk/cosmos/azure-cosmos-tests/src/test/resources/multi-region-strong.xml @@ -21,14 +21,14 @@ ~ SOFTWARE. --> - + - + - + diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index b9a62661e695..47ca8bbf4b36 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -12,6 +12,7 @@ #### Other Changes * Enabled hostname validation for RNTBD connections to backend - [PR 47111](https://github.com/Azure/azure-sdk-for-java/pull/47111) * Changed to use incremental change feed to get partition key ranges. - [46810](https://github.com/Azure/azure-sdk-for-java/pull/46810) +* Optimized 410 `Lease Not Found` handling for Strong Consistency account by avoiding unnecessary retries in the barrier attainment flow. - [PR 47232](https://github.com/Azure/azure-sdk-for-java/pull/47232) ### 4.75.0 (2025-10-21) > [!IMPORTANT] diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java index 92a781cf071c..86b3e6cd0346 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/ConsistencyWriter.java @@ -27,7 +27,6 @@ import com.azure.cosmos.implementation.Strings; import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.collections.ComparatorUtils; -import com.azure.cosmos.implementation.directconnectivity.rntbd.ClosedClientTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Exceptions; @@ -46,11 +45,14 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static com.azure.cosmos.implementation.Exceptions.isAvoidQuorumSelectionException; + /* * ConsistencyWriter has two modes for writing - local quorum-acked write and globally strong write. * @@ -149,6 +151,8 @@ Mono writePrivateAsync( TimeoutHelper timeout, boolean forceRefresh) { + final AtomicReference cosmosExceptionValueHolder = new AtomicReference<>(null); + if (timeout.isElapsed() && // skip throwing RequestTimeout on first retry because the first retry with // force address refresh header can be critical to recover for example from @@ -280,7 +284,7 @@ Mono writePrivateAsync( false, primaryURI.get(), replicaStatusList); - return barrierForGlobalStrong(request, response); + return barrierForGlobalStrong(request, response, cosmosExceptionValueHolder); }) .doFinally(signalType -> { if (signalType != SignalType.CANCEL) { @@ -296,11 +300,16 @@ Mono writePrivateAsync( } else { Mono barrierRequestObs = BarrierRequestHelper.createAsync(this.diagnosticsClientContext, request, this.authorizationTokenProvider, null, request.requestContext.globalCommittedSelectedLSN); - return barrierRequestObs.flatMap(barrierRequest -> waitForWriteBarrierAsync(barrierRequest, request.requestContext.globalCommittedSelectedLSN) + return barrierRequestObs.flatMap(barrierRequest -> waitForWriteBarrierAsync(barrierRequest, request.requestContext.globalCommittedSelectedLSN, cosmosExceptionValueHolder) .flatMap(v -> { if (!v) { logger.info("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {}", request.requestContext.globalCommittedSelectedLSN); + + if (cosmosExceptionValueHolder.get() != null) { + return Mono.error(cosmosExceptionValueHolder.get()); + } + return Mono.error(new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, HttpConstants.SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET)); } @@ -324,7 +333,11 @@ boolean isGlobalStrongRequest(RxDocumentServiceRequest request, StoreResponse re return false; } - Mono barrierForGlobalStrong(RxDocumentServiceRequest request, StoreResponse response) { + Mono barrierForGlobalStrong( + RxDocumentServiceRequest request, + StoreResponse response, + AtomicReference cosmosExceptionValueHolder) { + try { if (ReplicatedResourceClient.isGlobalStrongEnabled() && this.isGlobalStrongRequest(request, response)) { Utils.ValueHolder lsn = Utils.ValueHolder.initialize(-1L); @@ -355,12 +368,17 @@ Mono barrierForGlobalStrong(RxDocumentServiceRequest request, Sto request.requestContext.globalCommittedSelectedLSN); return barrierRequestObs.flatMap(barrierRequest -> { - Mono barrierWait = this.waitForWriteBarrierAsync(barrierRequest, request.requestContext.globalCommittedSelectedLSN); + Mono barrierWait = this.waitForWriteBarrierAsync(barrierRequest, request.requestContext.globalCommittedSelectedLSN, cosmosExceptionValueHolder); return barrierWait.flatMap(res -> { if (!res) { logger.error("ConsistencyWriter: Write barrier has not been met for global strong request. SelectedGlobalCommittedLsn: {}", request.requestContext.globalCommittedSelectedLSN); + + if (cosmosExceptionValueHolder.get() != null) { + return Mono.error(cosmosExceptionValueHolder.get()); + } + // RxJava1 doesn't allow throwing checked exception return Mono.error(new GoneException(RMResources.GlobalStrongWriteBarrierNotMet, HttpConstants.SubStatusCodes.GLOBAL_STRONG_WRITE_BARRIER_NOT_MET)); @@ -384,7 +402,11 @@ Mono barrierForGlobalStrong(RxDocumentServiceRequest request, Sto } } - private Mono waitForWriteBarrierAsync(RxDocumentServiceRequest barrierRequest, long selectedGlobalCommittedLsn) { + private Mono waitForWriteBarrierAsync( + RxDocumentServiceRequest barrierRequest, + long selectedGlobalCommittedLsn, + AtomicReference cosmosExceptionValueHolder) { + AtomicInteger writeBarrierRetryCount = new AtomicInteger(ConsistencyWriter.MAX_NUMBER_OF_WRITE_BARRIER_READ_RETRIES); AtomicLong maxGlobalCommittedLsnReceived = new AtomicLong(0); return Flux.defer(() -> { @@ -392,6 +414,10 @@ private Mono waitForWriteBarrierAsync(RxDocumentServiceRequest barrierR return Flux.error(new RequestTimeoutException()); } + if (writeBarrierRetryCount.get() == 0) { + return Mono.just(false); + } + Mono> storeResultListObs = this.storeReader.readMultipleReplicaAsync( barrierRequest, true /*allowPrimary*/, @@ -403,6 +429,27 @@ private Mono waitForWriteBarrierAsync(RxDocumentServiceRequest barrierR false /*forceReadAll*/); return storeResultListObs.flatMap( responses -> { + + boolean isAvoidQuorumSelectionStoreResult = false; + CosmosException cosmosExceptionFromStoreResult = null; + + for (StoreResult storeResult : responses) { + if (storeResult.isAvoidQuorumSelectionException) { + isAvoidQuorumSelectionStoreResult = true; + cosmosExceptionFromStoreResult = storeResult.getException(); + break; + } + } + + if (isAvoidQuorumSelectionStoreResult) { + writeBarrierRetryCount.decrementAndGet(); + return this.isBarrierMeetPossibleInPresenceOfAvoidQuorumSelectionException( + barrierRequest, + selectedGlobalCommittedLsn, + cosmosExceptionValueHolder, + cosmosExceptionFromStoreResult); + } + if (responses != null && responses.stream().anyMatch(response -> response.globalCommittedLSN >= selectedGlobalCommittedLsn)) { return Mono.just(Boolean.TRUE); } @@ -459,6 +506,86 @@ static void getLsnAndGlobalCommittedLsn(StoreResponse response, Utils.ValueHolde } } + private Mono isBarrierMeetPossibleInPresenceOfAvoidQuorumSelectionException( + RxDocumentServiceRequest barrierRequest, + long selectedGlobalCommittedLsn, + AtomicReference cosmosExceptionValueHolder, + CosmosException cosmosExceptionInStoreResult) { + + AtomicBoolean bailFromWriteBarrierLoop = new AtomicBoolean(false); + + return performOptimisticBarrierOnPrimaryAndDetermineIfBarrierCanBeSatisfied( + barrierRequest, + selectedGlobalCommittedLsn, + cosmosExceptionValueHolder, + bailFromWriteBarrierLoop).flatMap(isBarrierFromPrimarySuccessful -> { + + if (isBarrierFromPrimarySuccessful) { + bailFromWriteBarrierLoop.set(true); + cosmosExceptionValueHolder.set(null); + + return Mono.just(true); + } + + if (bailFromWriteBarrierLoop.get()) { + bailFromWriteBarrierLoop.set(true); + cosmosExceptionValueHolder.set(Utils.createCosmosException( + HttpConstants.StatusCodes.REQUEST_TIMEOUT, + cosmosExceptionInStoreResult.getSubStatusCode(), + cosmosExceptionInStoreResult, + null)); + return Mono.just(false); + } else { + bailFromWriteBarrierLoop.set(false); + cosmosExceptionValueHolder.set(null); + return Mono.empty(); + } + }); + } + + private Mono performOptimisticBarrierOnPrimaryAndDetermineIfBarrierCanBeSatisfied( + RxDocumentServiceRequest barrierRequest, + long selectedGlobalCommittedLSN, + AtomicReference cosmosExceptionValueHolder, + AtomicBoolean bailFromWriteBarrierLoop) { + + barrierRequest.requestContext.forceRefreshAddressCache = true; + Mono storeResultObs = this.storeReader.readPrimaryAsync( + barrierRequest, true, false /*useSessionToken*/); + + return storeResultObs.flatMap(storeResult -> { + if (!storeResult.isValid) { + return Mono.just(false); + } + + boolean hasRequiredGlobalCommittedLsn = storeResult.globalCommittedLSN >= selectedGlobalCommittedLSN; + + barrierRequest.requestContext.forceRefreshAddressCache = false; + return Mono.just(hasRequiredGlobalCommittedLsn); + }) + .onErrorResume(throwable -> { + + barrierRequest.requestContext.forceRefreshAddressCache = false; + + if (throwable instanceof CosmosException) { + CosmosException cosmosException = Utils.as(throwable, CosmosException.class); + + if (isAvoidQuorumSelectionException(cosmosException)) { + + bailFromWriteBarrierLoop.set(true); + cosmosExceptionValueHolder.set(cosmosException); + return Mono.just(false); + } + + bailFromWriteBarrierLoop.set(false); + return Mono.just(false); + } + + bailFromWriteBarrierLoop.set(false); + return Mono.just(false); + }); + } + void startBackgroundAddressRefresh(RxDocumentServiceRequest request) { this.addressSelector.resolvePrimaryUriAsync(request, true) .publishOn(Schedulers.boundedElastic()) diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java index 1f01e66076d6..487d2184db39 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/QuorumReader.java @@ -12,7 +12,6 @@ import com.azure.cosmos.implementation.Exceptions; import com.azure.cosmos.implementation.GoneException; import com.azure.cosmos.implementation.HttpConstants; -import com.azure.cosmos.implementation.ImplementationBridgeHelpers; import com.azure.cosmos.implementation.InternalServerErrorException; import com.azure.cosmos.implementation.Configs; import com.azure.cosmos.implementation.IAuthorizationTokenProvider; @@ -22,6 +21,7 @@ import com.azure.cosmos.implementation.RMResources; import com.azure.cosmos.implementation.RequestChargeTracker; import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.Utils; import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,10 +32,13 @@ import java.util.Comparator; import java.util.List; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import static com.azure.cosmos.implementation.Exceptions.isAvoidQuorumSelectionException; import static com.azure.cosmos.implementation.Utils.ValueHolder; // @@ -135,6 +138,8 @@ public Mono readStrongAsync( final MutableVolatile shouldRetryOnSecondary = new MutableVolatile<>(false); final MutableVolatile hasPerformedReadFromPrimary = new MutableVolatile<>(false); final MutableVolatile includePrimary = new MutableVolatile<>(false); + final AtomicReference cosmosExceptionValueHolder = new AtomicReference<>(null); + final AtomicBoolean bailOnBarrierValueHolder = new AtomicBoolean(false); return Flux.defer( // the following will be repeated till the repeat().takeUntil(.) condition is satisfied. @@ -149,7 +154,9 @@ public Mono readStrongAsync( entity, readQuorumValue, includePrimary.v, - readMode); + readMode, + cosmosExceptionValueHolder, + bailOnBarrierValueHolder); return secondaryQuorumReadResultObs.flux().flatMap( secondaryQuorumReadResult -> { @@ -185,7 +192,9 @@ public Mono readStrongAsync( readQuorumValue, secondaryQuorumReadResult.selectedLsn, secondaryQuorumReadResult.globalCommittedSelectedLsn, - readMode); + readMode, + cosmosExceptionValueHolder, + bailOnBarrierValueHolder); return readBarrierObs.flux().flatMap( readBarrier -> { @@ -291,11 +300,15 @@ public Mono readStrongAsync( .switchIfEmpty(Flux.defer(() -> { logger.info("Could not complete read quorum with read quorum value of {}", readQuorumValue); - return Flux.error(new GoneException( + if (cosmosExceptionValueHolder.get() != null) { + return Flux.error(cosmosExceptionValueHolder.get()); + } + + return Flux.error(new GoneException( String.format( RMResources.ReadQuorumNotMet, readQuorumValue), - HttpConstants.SubStatusCodes.READ_QUORUM_NOT_MET)); + HttpConstants.SubStatusCodes.READ_QUORUM_NOT_MET)); })) .take(1) .single(); @@ -305,7 +318,10 @@ private Mono readQuorumAsync( RxDocumentServiceRequest entity, int readQuorum, boolean includePrimary, - ReadMode readMode) { + ReadMode readMode, + AtomicReference cosmosExceptionValueHolder, + AtomicBoolean bailOnBarrierValueHolder) { + if (entity.requestContext.timeoutHelper.isElapsed()) { return Mono.error(new GoneException()); } @@ -327,9 +343,21 @@ private Mono readQuorumAsync( Mono barrierRequestObs = BarrierRequestHelper.createAsync(this.diagnosticsClientContext, entity, this.authorizationTokenProvider, readLsn, globalCommittedLSN); return barrierRequestObs.flatMap( barrierRequest -> { - Mono waitForObs = this.waitForReadBarrierAsync(barrierRequest, false, readQuorum, readLsn, globalCommittedLSN, readMode); + Mono waitForObs = this.waitForReadBarrierAsync(barrierRequest, false, readQuorum, readLsn, globalCommittedLSN, readMode, cosmosExceptionValueHolder, bailOnBarrierValueHolder); return waitForObs.flatMap( waitFor -> { + + if (bailOnBarrierValueHolder.get() && cosmosExceptionValueHolder.get() != null) { + return Mono.just(new ReadQuorumResult( + entity.requestContext.requestChargeTracker, + ReadQuorumResultKind.QuorumNotPossibleInCurrentRegion, + readLsn, + globalCommittedLSN, + storeResult, + storeResponses, + cosmosExceptionValueHolder.get())); + } + if (!waitFor) { return Mono.just(new ReadQuorumResult( entity.requestContext.requestChargeTracker, @@ -616,7 +644,9 @@ private Mono waitForReadBarrierAsync( final int readQuorum, final long readBarrierLsn, final long targetGlobalCommittedLSN, - ReadMode readMode) { + ReadMode readMode, + AtomicReference cosmosExceptionValueHolder, + AtomicBoolean bailFromReadBarrierLoopValueHolder) { AtomicInteger readBarrierRetryCount = new AtomicInteger(maxNumberOfReadBarrierReadRetries); AtomicInteger readBarrierRetryCountMultiRegion = new AtomicInteger(maxBarrierRetriesForMultiRegion); @@ -635,6 +665,32 @@ private Mono waitForReadBarrierAsync( return responsesObs.flux().flatMap( responses -> { + boolean isAvoidQuorumSelectionStoreResult = false; + CosmosException cosmosExceptionFromStoreResult = null; + + if (readBarrierRetryCount.get() == 0) { + return Mono.just(false); + } + + for (StoreResult storeResult : responses) { + if (storeResult.isAvoidQuorumSelectionException) { + isAvoidQuorumSelectionStoreResult = true; + cosmosExceptionFromStoreResult = storeResult.getException(); + break; + } + } + + if (isAvoidQuorumSelectionStoreResult) { + readBarrierRetryCount.decrementAndGet(); + return this.isBarrierMeetPossibleInPresenceOfAvoidQuorumSelectionException( + barrierRequest, + readBarrierLsn, + targetGlobalCommittedLSN, + cosmosExceptionValueHolder, + bailFromReadBarrierLoopValueHolder, + cosmosExceptionFromStoreResult); + } + long maxGlobalCommittedLsnInResponses = responses.size() > 0 ? responses.stream() .mapToLong(response -> response.globalCommittedLSN).max().getAsLong() : 0; @@ -677,6 +733,10 @@ private Mono waitForReadBarrierAsync( return Flux.just(true); } + if (bailFromReadBarrierLoopValueHolder.get()) { + return Flux.just(false); + } + // we will go into global strong read barrier mode for global strong requests after regular barrier calls have been exhausted. if (targetGlobalCommittedLSN > 0) { return Flux.defer(() -> { @@ -685,12 +745,39 @@ private Mono waitForReadBarrierAsync( return Flux.error(new GoneException()); } - Mono> responsesObs = this.storeReader.readMultipleReplicaAsync( - barrierRequest, allowPrimary, readQuorum, - true /*required valid LSN*/, false /*useSessionToken*/, readMode, false /*checkMinLSN*/, true /*forceReadAll*/); + if (readBarrierRetryCountMultiRegion.get() == 0) { + return Flux.just(false); + } + + Mono> responsesObs = this.storeReader.readMultipleReplicaAsync( + barrierRequest, allowPrimary, readQuorum, + true /*required valid LSN*/, false /*useSessionToken*/, readMode, false /*checkMinLSN*/, true /*forceReadAll*/); return responsesObs.flux().flatMap( responses -> { + + boolean isAvoidQuorumSelectionStoreResult = false; + CosmosException cosmosExceptionFromStoreResult = null; + + for (StoreResult storeResult : responses) { + if (storeResult.isAvoidQuorumSelectionException) { + isAvoidQuorumSelectionStoreResult = true; + cosmosExceptionFromStoreResult = storeResult.getException(); + break; + } + } + + if (isAvoidQuorumSelectionStoreResult) { + readBarrierRetryCountMultiRegion.getAndDecrement(); + return this.isBarrierMeetPossibleInPresenceOfAvoidQuorumSelectionException( + barrierRequest, + readBarrierLsn, + targetGlobalCommittedLSN, + cosmosExceptionValueHolder, + bailFromReadBarrierLoopValueHolder, + cosmosExceptionFromStoreResult); + } + long maxGlobalCommittedLsnInResponses = responses.size() > 0 ? responses.stream() .mapToLong(response -> response.globalCommittedLSN).max().getAsLong() : 0; @@ -823,6 +910,91 @@ private boolean isQuorumMet( return isQuorumMet; } + private Mono isBarrierMeetPossibleInPresenceOfAvoidQuorumSelectionException( + RxDocumentServiceRequest barrierRequest, + long readBarrierLsn, + long targetGlobalCommittedLSN, + AtomicReference cosmosExceptionValueHolder, + AtomicBoolean bailFromReadBarrierLoop, + CosmosException cosmosExceptionInStoreResult) { + + return performBarrierOnPrimaryAndDetermineIfBarrierCanBeSatisfied( + barrierRequest, + true, + readBarrierLsn, + targetGlobalCommittedLSN, + cosmosExceptionValueHolder, + bailFromReadBarrierLoop).flatMap(isBarrierFromPrimarySuccessful -> { + + if (isBarrierFromPrimarySuccessful) { + bailFromReadBarrierLoop.set(true); + cosmosExceptionValueHolder.set(null); + + return Mono.just(true); + } + + if (bailFromReadBarrierLoop.get()) { + bailFromReadBarrierLoop.set(true); + cosmosExceptionValueHolder.set(Utils.createCosmosException( + HttpConstants.StatusCodes.SERVICE_UNAVAILABLE, + cosmosExceptionInStoreResult.getSubStatusCode(), + cosmosExceptionInStoreResult, + null)); + return Mono.just(false); + } else { + bailFromReadBarrierLoop.set(false); + cosmosExceptionValueHolder.set(null); + return Mono.empty(); + } + }); + } + + private Mono performBarrierOnPrimaryAndDetermineIfBarrierCanBeSatisfied( + RxDocumentServiceRequest barrierRequest, + boolean requiresValidLsn, + long readBarrierLsn, + long targetGlobalCommittedLSN, + AtomicReference cosmosExceptionValueHolder, + AtomicBoolean bailFromReadBarrierLoop) { + + barrierRequest.requestContext.forceRefreshAddressCache = true; + Mono storeResultObs = this.storeReader.readPrimaryAsync( + barrierRequest, requiresValidLsn, false /*useSessionToken*/); + + return storeResultObs.flatMap(storeResult -> { + if (!storeResult.isValid) { + return Mono.just(false); + } + + boolean hasRequiredLsn = storeResult.lsn >= readBarrierLsn; + boolean hasRequiredGlobalCommittedLsn = + targetGlobalCommittedLSN <= 0 || storeResult.globalCommittedLSN >= targetGlobalCommittedLSN; + + return Mono.just(hasRequiredLsn && hasRequiredGlobalCommittedLsn); + }) + .onErrorResume(throwable -> { + + barrierRequest.requestContext.forceRefreshAddressCache = false; + + if (throwable instanceof CosmosException) { + CosmosException cosmosException = Utils.as(throwable, CosmosException.class); + + if (isAvoidQuorumSelectionException(cosmosException)) { + + bailFromReadBarrierLoop.set(true); + cosmosExceptionValueHolder.set(cosmosException); + return Mono.just(false); + } + + bailFromReadBarrierLoop.set(false); + return Mono.just(false); + } + + bailFromReadBarrierLoop.set(false); + return Mono.just(false); + }); + } + private enum ReadQuorumResultKind { QuorumMet, QuorumSelected, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java index 5169dfb121a7..a6046151b23f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreReader.java @@ -281,36 +281,31 @@ private Flux> readFromReplicas(List resultCollect if (srr.isAvoidQuorumSelectionException) { - // todo: fail fast when barrier requests also hit isAvoidQuorumSelectionException? - // todo: https://github.com/Azure/azure-sdk-for-java/issues/46135 - if (!entity.isBarrierRequest) { + // isAvoidQuorumSelectionException is a special case where we want to enable the enclosing data plane operation + // to fail fast in the region where a quorum selection is being attempted + // no attempts to reselect quorum will be made + if (logger.isDebugEnabled()) { - // isAvoidQuorumSelectionException is a special case where we want to enable the enclosing data plane operation - // to fail fast in the region where a quorum selection is being attempted - // no attempts to reselect quorum will be made - if (logger.isDebugEnabled()) { + int statusCode = srr.getException() != null ? srr.getException().getStatusCode() : 0; + int subStatusCode = srr.getException() != null ? srr.getException().getSubStatusCode() : 0; - int statusCode = srr.getException() != null ? srr.getException().getStatusCode() : 0; - int subStatusCode = srr.getException() != null ? srr.getException().getSubStatusCode() : 0; - - logger.debug("An exception with error code [{}-{}] was observed which means quorum cannot be attained in the current region!", statusCode, subStatusCode); - } + logger.debug("An exception with error code [{}-{}] was observed which means quorum cannot be attained in the current region!", statusCode, subStatusCode); + } - if (!entity.requestContext.performedBackgroundAddressRefresh) { - this.startBackgroundAddressRefresh(entity); - entity.requestContext.performedBackgroundAddressRefresh = true; - } + if (!entity.requestContext.performedBackgroundAddressRefresh) { + this.startBackgroundAddressRefresh(entity); + entity.requestContext.performedBackgroundAddressRefresh = true; + } - // (collect quorum store results if possible) - // for QuorumReader (upstream) to make the final decision on quorum selection - resultCollector.add(srr); + // (collect quorum store results if possible) + // for QuorumReader (upstream) to make the final decision on quorum selection + resultCollector.add(srr); - // Remaining replicas - replicasToRead.set(replicaCountToRead - resultCollector.size()); + // Remaining replicas + replicasToRead.set(replicaCountToRead - resultCollector.size()); - // continue to the next store result - continue; - } + // continue to the next store result + continue; } if (srr.isValid) { @@ -687,6 +682,15 @@ private Mono readPrimaryInternalAsync( true, primaryUriReference.get(), replicaStatusList); + + if (storeTaskException instanceof CosmosException) { + CosmosException cosmosException = (CosmosException) storeTaskException; + + if (com.azure.cosmos.implementation.Exceptions.isAvoidQuorumSelectionException(cosmosException)) { + return Mono.error(cosmosException); + } + } + return Mono.just(storeResult); } catch (CosmosException e) { // RxJava1 doesn't allow throwing checked exception from Observable operators diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java index a4bab8f3edbe..a3da4b9c487e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/StoreResponse.java @@ -205,7 +205,7 @@ public String getHeaderValue(String attribute) { } //NOTE: only used for testing purpose to change the response header value - private void setHeaderValue(String headerName, String value) { + void setHeaderValue(String headerName, String value) { if (this.responseHeaderValues == null || this.responseHeaderNames.length != this.responseHeaderValues.length) { return; } diff --git a/sdk/cosmos/live-platform-matrix.json b/sdk/cosmos/live-platform-matrix.json index 96581316474d..e5771c249c1c 100644 --- a/sdk/cosmos/live-platform-matrix.json +++ b/sdk/cosmos/live-platform-matrix.json @@ -13,8 +13,8 @@ "-Pcircuit-breaker-misc-gateway": "CircuitBreakerMiscGateway", "-Pcircuit-breaker-read-all-read-many": "CircuitBreakerReadAllAndReadMany", "-Pmulti-region": "MultiRegion", + "-Pmulti-region-strong": "MultiRegionStrong", "-Plong": "Long", - "-Pfault-injection-barrier": "Fault-injection-barrier", "-DargLine=\"-Dazure.cosmos.directModeProtocol=Tcp\"": "TCP", "Session": "", "ubuntu": "", @@ -182,9 +182,13 @@ { "DESIRED_CONSISTENCIES": "[\"Strong\"]", "ACCOUNT_CONSISTENCY": "Strong", + "ArmConfig": { + "MultiRegion_Strong": { + "ArmTemplateParameters": "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Strong'; enableMultipleRegions = $true }" + } + }, "PROTOCOLS": "[\"Tcp\"]", - "ProfileFlag": [ "-Pfault-injection-barrier" ], - "ArmTemplateParameters": "@{ enableMultipleWriteLocations = $false; defaultConsistencyLevel = 'Strong'; enableMultipleRegions = $true }", + "ProfileFlag": [ "-Pmulti-region-strong" ], "Agent": { "ubuntu": { "OSVmImage": "env:LINUXVMIMAGE", "Pool": "env:LINUXPOOL" } }