From bfb2d81c59d22bb1836f8f8bf4fc239fc4d5d0f9 Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Tue, 25 Nov 2025 01:46:47 -0800 Subject: [PATCH 1/2] PHOENIX-7738 Tests with random filter expressions and segment scan --- .../apache/phoenix/ddb/BaseSegmentScanIT.java | 495 +++++++++++++++++- .../ConcurrentDifferentSegmentCountIT.java | 11 +- .../phoenix/ddb/ConcurrentSegmentScanIT.java | 11 +- .../org/apache/phoenix/ddb/ScanIndex3IT.java | 30 +- .../apache/phoenix/ddb/SegmentScan1IT.java | 57 ++ .../apache/phoenix/ddb/SegmentScan2IT.java | 55 ++ .../org/apache/phoenix/ddb/SegmentScanIT.java | 129 ++--- 7 files changed, 674 insertions(+), 114 deletions(-) create mode 100644 phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan1IT.java create mode 100644 phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan2IT.java diff --git a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java index 6587f62..7e514fc 100644 --- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java +++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/BaseSegmentScanIT.java @@ -34,10 +34,12 @@ import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; -import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutRequest; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; +import software.amazon.awssdk.services.dynamodb.model.WriteRequest; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; @@ -129,7 +131,7 @@ public static void stopLocalDynamoDb() throws Exception { } /** - * Create a test item based on the configured key types. + * Create a test item based on the configured key types with rich attributes for filter testing. */ protected Map createTestItem(int index, String hashKeyName, ScalarAttributeType hashKeyType, String sortKeyName, ScalarAttributeType sortKeyType) { @@ -170,8 +172,54 @@ protected Map createTestItem(int index, String hashKeyNa } item.put("data", AttributeValue.builder().s("test_data_" + index).build()); + item.put("status", AttributeValue.builder().s(getStatus(index)).build()); + item.put("priority", AttributeValue.builder().n(String.valueOf(index % 10)).build()); + item.put("active", AttributeValue.builder().bool(index % 2 == 0).build()); + item.put("amount", AttributeValue.builder().n(String.valueOf((index % 1000) + 1)).build()); + item.put("category", AttributeValue.builder().s(getCategory(index)).build()); + item.put("description", AttributeValue.builder().s("description_" + index).build()); + item.put("timestamp", AttributeValue.builder().n(String.valueOf(1000000 + index)).build()); + + List tags = new ArrayList<>(); + tags.add("tag_" + (index % 5)); + tags.add("common_tag"); + if (index % 3 == 0) { + tags.add("special_tag"); + } + item.put("tags", AttributeValue.builder().ss(tags).build()); + + List scores = new ArrayList<>(); + scores.add(String.valueOf(index % 100)); + scores.add(String.valueOf((index % 50) + 100)); + item.put("scores", AttributeValue.builder().ns(scores).build()); + + Map metadata = new HashMap<>(); + metadata.put("version", AttributeValue.builder().s("v" + (index % 3)).build()); + metadata.put("count", AttributeValue.builder().n(String.valueOf(index % 20)).build()); + metadata.put("enabled", AttributeValue.builder().bool(index % 4 == 0).build()); + item.put("metadata", AttributeValue.builder().m(metadata).build()); + + List items = new ArrayList<>(); + items.add(AttributeValue.builder().s("item_" + (index % 7)).build()); + items.add(AttributeValue.builder().n(String.valueOf(index % 30)).build()); + item.put("items", AttributeValue.builder().l(items).build()); + + if (index % 5 == 0) { + item.put("optional_field", AttributeValue.builder().s("optional_" + index).build()); + } + return item; } + + private String getStatus(int index) { + String[] statuses = {"pending", "active", "completed", "failed", "cancelled"}; + return statuses[index % statuses.length]; + } + + private String getCategory(int index) { + String[] categories = {"electronics", "books", "clothing", "food", "toys", "sports"}; + return categories[index % categories.length]; + } /** * Split the HBase table at a calculated point. @@ -206,17 +254,31 @@ protected void splitTable(String tableName, ScalarAttributeType hashKeyType, int * Perform a full scan with pagination using the specified limit. */ protected List> performFullScanWithPagination(DynamoDbClient client, - String tableName, boolean useFilter) { + String tableName, boolean useFilter, int filterNum) { + return performFullScanWithPagination(client, tableName, useFilter, filterNum, SCAN_LIMIT); + } + + /** + * Perform a full scan with pagination using a custom scan limit. + */ + protected List> performFullScanWithPagination(DynamoDbClient client, + String tableName, boolean useFilter, int filterNum, int scanLimit) { List> allItems = new ArrayList<>(); Map lastEvaluatedKey = null; do { ScanRequest.Builder scanBuilder = ScanRequest.builder() .tableName(tableName) - .limit(SCAN_LIMIT); + .limit(scanLimit); if (useFilter) { - scanBuilder.filterExpression(generateFilterExpression()) - .expressionAttributeNames(getFilterExpressionAttributeNames()) - .expressionAttributeValues(getFilterExpressionAttributeValues()); + scanBuilder.filterExpression(getFilterExpression(filterNum)); + Map attrNames = getFilterAttributeNames(filterNum); + if (!attrNames.isEmpty()) { + scanBuilder.expressionAttributeNames(attrNames); + } + Map attrValues = getFilterAttributeValues(filterNum); + if (!attrValues.isEmpty()) { + scanBuilder.expressionAttributeValues(attrValues); + } } if (lastEvaluatedKey != null) { scanBuilder.exclusiveStartKey(lastEvaluatedKey); @@ -232,20 +294,23 @@ protected List> performFullScanWithPagination(Dynamo * Insert items with periodic table splitting to test segment scan behavior * across different HBase regions. */ - protected void insertItemsWithSplitting(String tableName, String hashKeyName, ScalarAttributeType hashKeyType, - String sortKeyName, ScalarAttributeType sortKeyType, - int totalItems, int splitFrequency, int sleepMillis) throws Exception { + protected void insertItemsWithSplitting(String tableName, String hashKeyName, + ScalarAttributeType hashKeyType, String sortKeyName, ScalarAttributeType sortKeyType, + int totalItems, int splitFrequency, int sleepMillis) throws Exception { + List batch = new ArrayList<>(); for (int i = 0; i < totalItems; i++) { - Map item = createTestItem(i, hashKeyName, hashKeyType, sortKeyName, sortKeyType); - - PutItemRequest putRequest = PutItemRequest.builder() - .tableName(tableName) - .item(item) - .build(); - - phoenixDBClientV2.putItem(putRequest); - dynamoDbClient.putItem(putRequest); - + Map item = + createTestItem(i, hashKeyName, hashKeyType, sortKeyName, sortKeyType); + WriteRequest writeRequest = + WriteRequest.builder().putRequest(PutRequest.builder().item(item).build()) + .build(); + batch.add(writeRequest); + boolean shouldFlush = batch.size() >= 25 || (i > 0 && (i + 1) % splitFrequency == 0) + || i == totalItems - 1; + if (shouldFlush) { + executeBatchWrite(tableName, batch); + batch.clear(); + } // Periodically split the table to create multiple regions if (i > 0 && i % splitFrequency == 0) { splitTable(tableName, hashKeyType, i); @@ -254,6 +319,19 @@ protected void insertItemsWithSplitting(String tableName, String hashKeyName, Sc } } + /** + * Execute batch write for both Phoenix and DynamoDB clients. + */ + private void executeBatchWrite(String tableName, List batch) { + Map> requestItems = new HashMap<>(); + requestItems.put(tableName, new ArrayList<>(batch)); + BatchWriteItemRequest batchRequest = + BatchWriteItemRequest.builder().requestItems(requestItems).build(); + + phoenixDBClientV2.batchWriteItem(batchRequest); + dynamoDbClient.batchWriteItem(batchRequest); + } + /** * Scan a single segment with pagination. */ @@ -262,7 +340,8 @@ protected List> scanSingleSegmentWithPagination(Stri int totalSegments, int scanLimit, boolean addDelay, - boolean useFilter) { + boolean useFilter, + int filterNum) { List> segmentItems = new ArrayList<>(); Map lastEvaluatedKey = null; @@ -273,9 +352,15 @@ protected List> scanSingleSegmentWithPagination(Stri .totalSegments(totalSegments) .limit(scanLimit); if (useFilter) { - scanBuilder.filterExpression(generateFilterExpression()) - .expressionAttributeNames(getFilterExpressionAttributeNames()) - .expressionAttributeValues(getFilterExpressionAttributeValues()); + scanBuilder.filterExpression(getFilterExpression(filterNum)); + Map attrNames = getFilterAttributeNames(filterNum); + if (!attrNames.isEmpty()) { + scanBuilder.expressionAttributeNames(attrNames); + } + Map attrValues = getFilterAttributeValues(filterNum); + if (!attrValues.isEmpty()) { + scanBuilder.expressionAttributeValues(attrValues); + } } if (lastEvaluatedKey != null) { scanBuilder.exclusiveStartKey(lastEvaluatedKey); @@ -299,19 +384,377 @@ protected List> scanSingleSegmentWithPagination(Stri return segmentItems; } + protected String getFilterExpression(int filterNum) { + switch (filterNum) { + case 1: + return getFilterExpression1(); + case 2: + return getFilterExpression2(); + case 3: + return getFilterExpression3(); + case 4: + return getFilterExpression4(); + case 5: + return getFilterExpression5(); + case 6: + return getFilterExpression6(); + case 7: + return getFilterExpression7(); + case 8: + return getFilterExpression8(); + case 9: + return getFilterExpression9(); + case 10: + return getFilterExpression10(); + case 11: + return getFilterExpression11(); + case 12: + return getFilterExpression12(); + case 13: + return getFilterExpression13(); + case 14: + return getFilterExpression14(); + case 15: + return getFilterExpression15(); + default: + return getFilterExpression1(); + } + } + + protected Map getFilterAttributeNames(int filterNum) { + switch (filterNum) { + case 1: + return getFilterAttributeNames1(); + case 2: + return getFilterAttributeNames2(); + case 3: + return getFilterAttributeNames3(); + case 4: + return getFilterAttributeNames4(); + case 5: + return getFilterAttributeNames5(); + case 6: + return getFilterAttributeNames6(); + case 7: + return getFilterAttributeNames7(); + case 8: + return getFilterAttributeNames8(); + case 9: + return getFilterAttributeNames9(); + case 10: + return getFilterAttributeNames10(); + case 11: + return getFilterAttributeNames11(); + case 12: + return getFilterAttributeNames12(); + case 13: + return getFilterAttributeNames13(); + case 14: + return getFilterAttributeNames14(); + case 15: + return getFilterAttributeNames15(); + default: + return getFilterAttributeNames1(); + } + } + + protected Map getFilterAttributeValues(int filterNum) { + switch (filterNum) { + case 1: + return getFilterAttributeValues1(); + case 2: + return getFilterAttributeValues2(); + case 3: + return getFilterAttributeValues3(); + case 4: + return getFilterAttributeValues4(); + case 5: + return getFilterAttributeValues5(); + case 6: + return getFilterAttributeValues6(); + case 7: + return getFilterAttributeValues7(); + case 8: + return getFilterAttributeValues8(); + case 9: + return getFilterAttributeValues9(); + case 10: + return getFilterAttributeValues10(); + case 11: + return getFilterAttributeValues11(); + case 12: + return getFilterAttributeValues12(); + case 13: + return getFilterAttributeValues13(); + case 14: + return getFilterAttributeValues14(); + case 15: + return getFilterAttributeValues15(); + default: + return getFilterAttributeValues1(); + } + } + protected String generateFilterExpression() { - return "begins_with(#data, :prefix)"; + return getFilterExpression1(); } protected Map getFilterExpressionAttributeNames() { + return getFilterAttributeNames1(); + } + + protected Map getFilterExpressionAttributeValues() { + return getFilterAttributeValues1(); + } + + protected String getFilterExpression1() { + return "begins_with(#data, :prefix)"; + } + + protected Map getFilterAttributeNames1() { Map expressionAttributeNames = new HashMap<>(); expressionAttributeNames.put("#data", "data"); return expressionAttributeNames; } - protected Map getFilterExpressionAttributeValues() { + protected Map getFilterAttributeValues1() { Map expressionAttributeValues = new HashMap<>(); expressionAttributeValues.put(":prefix", AttributeValue.builder().s("test_data_1").build()); return expressionAttributeValues; } + + protected String getFilterExpression2() { + return "#amount BETWEEN :minAmount AND :maxAmount"; + } + + protected Map getFilterAttributeNames2() { + Map names = new HashMap<>(); + names.put("#amount", "amount"); + return names; + } + + protected Map getFilterAttributeValues2() { + Map values = new HashMap<>(); + values.put(":minAmount", AttributeValue.builder().n("200").build()); + values.put(":maxAmount", AttributeValue.builder().n("800").build()); + return values; + } + + protected String getFilterExpression3() { + return "#active = :isActive AND #priority > :minPriority"; + } + + protected Map getFilterAttributeNames3() { + Map names = new HashMap<>(); + names.put("#active", "active"); + names.put("#priority", "priority"); + return names; + } + + protected Map getFilterAttributeValues3() { + Map values = new HashMap<>(); + values.put(":isActive", AttributeValue.builder().bool(true).build()); + values.put(":minPriority", AttributeValue.builder().n("5").build()); + return values; + } + + protected String getFilterExpression4() { + return "#status IN (:status1, :status2, :status3)"; + } + + protected Map getFilterAttributeNames4() { + Map names = new HashMap<>(); + names.put("#status", "status"); + return names; + } + + protected Map getFilterAttributeValues4() { + Map values = new HashMap<>(); + values.put(":status1", AttributeValue.builder().s("active").build()); + values.put(":status2", AttributeValue.builder().s("pending").build()); + values.put(":status3", AttributeValue.builder().s("completed").build()); + return values; + } + + protected String getFilterExpression5() { + return "contains(#tags, :tag)"; + } + + protected Map getFilterAttributeNames5() { + Map names = new HashMap<>(); + names.put("#tags", "tags"); + return names; + } + + protected Map getFilterAttributeValues5() { + Map values = new HashMap<>(); + values.put(":tag", AttributeValue.builder().s("special_tag").build()); + return values; + } + + protected String getFilterExpression6() { + return "#metadata.#version = :version AND #metadata.#enabled = :enabled"; + } + + protected Map getFilterAttributeNames6() { + Map names = new HashMap<>(); + names.put("#metadata", "metadata"); + names.put("#version", "version"); + names.put("#enabled", "enabled"); + return names; + } + + protected Map getFilterAttributeValues6() { + Map values = new HashMap<>(); + values.put(":version", AttributeValue.builder().s("v1").build()); + values.put(":enabled", AttributeValue.builder().bool(true).build()); + return values; + } + + protected String getFilterExpression7() { + return "attribute_exists(optional_field)"; + } + + protected Map getFilterAttributeNames7() { + return new HashMap<>(); + } + + protected Map getFilterAttributeValues7() { + return new HashMap<>(); + } + + protected String getFilterExpression8() { + return "attribute_not_exists(optional_field)"; + } + + protected Map getFilterAttributeNames8() { + return new HashMap<>(); + } + + protected Map getFilterAttributeValues8() { + return new HashMap<>(); + } + + protected String getFilterExpression9() { + return "(#category = :category AND #amount > :minAmount) OR " + + "(#status = :status AND #active = :active)"; + } + + protected Map getFilterAttributeNames9() { + Map names = new HashMap<>(); + names.put("#category", "category"); + names.put("#amount", "amount"); + names.put("#status", "status"); + names.put("#active", "active"); + return names; + } + + protected Map getFilterAttributeValues9() { + Map values = new HashMap<>(); + values.put(":category", AttributeValue.builder().s("electronics").build()); + values.put(":minAmount", AttributeValue.builder().n("500").build()); + values.put(":status", AttributeValue.builder().s("active").build()); + values.put(":active", AttributeValue.builder().bool(true).build()); + return values; + } + + protected String getFilterExpression10() { + return "NOT (#status = :status)"; + } + + protected Map getFilterAttributeNames10() { + Map names = new HashMap<>(); + names.put("#status", "status"); + return names; + } + + protected Map getFilterAttributeValues10() { + Map values = new HashMap<>(); + values.put(":status", AttributeValue.builder().s("failed").build()); + return values; + } + + protected String getFilterExpression11() { + return "size(#items) = :size"; + } + + protected Map getFilterAttributeNames11() { + Map names = new HashMap<>(); + names.put("#items", "items"); + return names; + } + + protected Map getFilterAttributeValues11() { + Map values = new HashMap<>(); + values.put(":size", AttributeValue.builder().n("2").build()); + return values; + } + + protected String getFilterExpression12() { + return "contains(#description, :substring)"; + } + + protected Map getFilterAttributeNames12() { + Map names = new HashMap<>(); + names.put("#description", "description"); + return names; + } + + protected Map getFilterAttributeValues12() { + Map values = new HashMap<>(); + values.put(":substring", AttributeValue.builder().s("description_1").build()); + return values; + } + + protected String getFilterExpression13() { + return "contains(#scores, :score)"; + } + + protected Map getFilterAttributeNames13() { + Map names = new HashMap<>(); + names.put("#scores", "scores"); + return names; + } + + protected Map getFilterAttributeValues13() { + Map values = new HashMap<>(); + values.put(":score", AttributeValue.builder().n("110").build()); + return values; + } + + protected String getFilterExpression14() { + return "#priority >= :minPri AND #priority <= :maxPri AND #category <> :excludeCategory"; + } + + protected Map getFilterAttributeNames14() { + Map names = new HashMap<>(); + names.put("#priority", "priority"); + names.put("#category", "category"); + return names; + } + + protected Map getFilterAttributeValues14() { + Map values = new HashMap<>(); + values.put(":minPri", AttributeValue.builder().n("3").build()); + values.put(":maxPri", AttributeValue.builder().n("7").build()); + values.put(":excludeCategory", AttributeValue.builder().s("food").build()); + return values; + } + + protected String getFilterExpression15() { + return "#timestamp > :startTime AND #timestamp < :endTime"; + } + + protected Map getFilterAttributeNames15() { + Map names = new HashMap<>(); + names.put("#timestamp", "timestamp"); + return names; + } + + protected Map getFilterAttributeValues15() { + Map values = new HashMap<>(); + values.put(":startTime", AttributeValue.builder().n("1010000").build()); + values.put(":endTime", AttributeValue.builder().n("1015000").build()); + return values; + } } diff --git a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentDifferentSegmentCountIT.java b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentDifferentSegmentCountIT.java index 852c767..60f3206 100644 --- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentDifferentSegmentCountIT.java +++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentDifferentSegmentCountIT.java @@ -69,8 +69,10 @@ private void runConcurrentDifferentSegmentCountTest(String hashKeyName, ScalarAt TOTAL_ITEMS, SPLIT_FREQUENCY, 75); // Perform full scan for comparison baseline - List> fullScanItemsPhoenix = performFullScanWithPagination(phoenixDBClientV2, tableName, false); - List> fullScanItemsDDB = performFullScanWithPagination(dynamoDbClient, tableName, false); + List> fullScanItemsPhoenix = + performFullScanWithPagination(phoenixDBClientV2, tableName, false, 0); + List> fullScanItemsDDB = + performFullScanWithPagination(dynamoDbClient, tableName, false, 0); // Execute concurrent segment scans with different totalSegments List> segmentScan3Items = performConcurrentSegmentScansWithDifferentCounts(tableName); @@ -141,8 +143,9 @@ private List> performAllSegmentScans(String tableNam // Scan each segment sequentially within this thread for (int segment = 0; segment < totalSegments; segment++) { - List> segmentItems = - scanSingleSegmentWithPagination(tableName, segment, totalSegments, SCAN_LIMIT, false, false); + List> segmentItems = + scanSingleSegmentWithPagination(tableName, segment, totalSegments, SCAN_LIMIT, + false, false, 0); allItems.addAll(segmentItems); LOGGER.info("Segment {}/{} scan found {} items", segment, totalSegments, segmentItems.size()); } diff --git a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentSegmentScanIT.java b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentSegmentScanIT.java index bd94d00..d26beab 100644 --- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentSegmentScanIT.java +++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ConcurrentSegmentScanIT.java @@ -70,8 +70,10 @@ private void runConcurrentSegmentScanTest(String hashKeyName, ScalarAttributeTyp TOTAL_ITEMS, SPLIT_FREQUENCY, 50); // Perform full scan for comparison - List> fullScanItemsPhoenix = performFullScanWithPagination(phoenixDBClientV2, tableName, false); - List> fullScanItemsDDB = performFullScanWithPagination(dynamoDbClient, tableName, false); + List> fullScanItemsPhoenix = + performFullScanWithPagination(phoenixDBClientV2, tableName, false, 0); + List> fullScanItemsDDB = + performFullScanWithPagination(dynamoDbClient, tableName, false, 0); // Execute concurrent segment scans with concurrent splits List> concurrentSegmentItems = @@ -138,8 +140,9 @@ private List> performConcurrentSegmentScansWithSplit } private List> scanSegmentWithPagination(String tableName, int segment) { - List> segmentItems = - scanSingleSegmentWithPagination(tableName, segment, TOTAL_SEGMENTS, SCAN_LIMIT, true, false); + List> segmentItems = + scanSingleSegmentWithPagination(tableName, segment, TOTAL_SEGMENTS, SCAN_LIMIT, + true, false, 0); LOGGER.info("Segment {} scan completed with {} items", segment, segmentItems.size()); return segmentItems; } diff --git a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java index c66b988..0dd7a78 100644 --- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java +++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/ScanIndex3IT.java @@ -40,13 +40,15 @@ import org.junit.rules.TestName; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest; import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest; -import software.amazon.awssdk.services.dynamodb.model.PutItemRequest; +import software.amazon.awssdk.services.dynamodb.model.PutRequest; import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest; +import software.amazon.awssdk.services.dynamodb.model.WriteRequest; import static org.apache.phoenix.query.BaseTest.setUpConfigForMiniCluster; @@ -87,6 +89,18 @@ public static void initialize() throws Exception { createTableAndInsertData(); } + private static void executeBatchWrite(String tableName, List batch) { + if (batch.isEmpty()) { + return; + } + Map> requestItems = new HashMap<>(); + requestItems.put(tableName, new ArrayList<>(batch)); + BatchWriteItemRequest batchRequest = + BatchWriteItemRequest.builder().requestItems(requestItems).build(); + phoenixDBClientV2.batchWriteItem(batchRequest); + dynamoDbClient.batchWriteItem(batchRequest); + } + private static void createTableAndInsertData() { CreateTableRequest createTableRequest = DDLTestUtils.getCreateTableRequest( TABLE_NAME, "pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N); @@ -96,6 +110,7 @@ private static void createTableAndInsertData() { phoenixDBClientV2.createTable(createTableRequest); dynamoDbClient.createTable(createTableRequest); + List batch = new ArrayList<>(); for (int i = 0; i < NUM_RECORDS; i++) { Map item = new HashMap<>(); item.put("pk", AttributeValue.builder().s("pk_" + (i % 100)).build()); @@ -126,10 +141,15 @@ private static void createTableAndInsertData() { nestedList.add(AttributeValue.builder().n(String.valueOf(i % 40)).build()); nestedList.add(AttributeValue.builder().ss("set_in_list_" + (i % 3), "list_common").build()); item.put("itms", AttributeValue.builder().l(nestedList).build()); - - PutItemRequest putRequest = PutItemRequest.builder().tableName(TABLE_NAME).item(item).build(); - phoenixDBClientV2.putItem(putRequest); - dynamoDbClient.putItem(putRequest); + + WriteRequest writeRequest = + WriteRequest.builder().putRequest(PutRequest.builder().item(item).build()) + .build(); + batch.add(writeRequest); + if (batch.size() >= 25 || i == NUM_RECORDS - 1) { + executeBatchWrite(TABLE_NAME, batch); + batch.clear(); + } } for (int i = 0; i < NUM_RECORDS; i++) { diff --git a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan1IT.java b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan1IT.java new file mode 100644 index 0000000..70fb38a --- /dev/null +++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan1IT.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.ddb; + +import java.util.Arrays; +import java.util.Collection; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; + +/** + * Segment scan tests - Part 1. + */ +@RunWith(Parameterized.class) +public class SegmentScan1IT extends SegmentScanIT { + + public SegmentScan1IT(String hashKeyName, ScalarAttributeType hashKeyType, String sortKeyName, + ScalarAttributeType sortKeyType, boolean useFilter) { + super(hashKeyName, hashKeyType, sortKeyName, sortKeyType, useFilter); + } + + @Parameterized.Parameters(name = "Hash_{1}_Sort_{3}_Filter_{4}") + public static Collection data() { + return Arrays.asList(new Object[][] { + // Hash key only + {"pk", ScalarAttributeType.S, null, null, true}, + {"pk", ScalarAttributeType.N, null, null, false}, + {"pk", ScalarAttributeType.B, null, null, true}, + + // String Hash Key combinations + {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.S, false}, + {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N, true}, + {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.B, false}, + + // Number Hash Key combinations + {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.S, true}, + {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.N, false}, + {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.B, true} + }); + } +} diff --git a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan2IT.java b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan2IT.java new file mode 100644 index 0000000..28e922c --- /dev/null +++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScan2IT.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.ddb; + +import java.util.Arrays; +import java.util.Collection; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; + +/** + * Segment scan tests - Part 2. + */ +@RunWith(Parameterized.class) +public class SegmentScan2IT extends SegmentScanIT { + + public SegmentScan2IT(String hashKeyName, ScalarAttributeType hashKeyType, String sortKeyName, + ScalarAttributeType sortKeyType, boolean useFilter) { + super(hashKeyName, hashKeyType, sortKeyName, sortKeyType, useFilter); + } + + @Parameterized.Parameters(name = "Hash_{1}_Sort_{3}_Filter_{4}") + public static Collection data() { + return Arrays.asList(new Object[][] { + // Binary Hash Key combinations + {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.S, false}, + {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.N, true}, + {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.B, false}, + + // Additional combinations + {"pk", ScalarAttributeType.S, null, null, true}, + {"pk", ScalarAttributeType.N, null, null, false}, + {"pk", ScalarAttributeType.B, null, null, true}, + {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.S, false}, + {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N, true}, + {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.B, false} + }); + } +} diff --git a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScanIT.java b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScanIT.java index f325de7..8a168c3 100644 --- a/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScanIT.java +++ b/phoenix-ddb-rest/src/test/java/org/apache/phoenix/ddb/SegmentScanIT.java @@ -19,91 +19,48 @@ import java.sql.SQLException; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Random; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest; import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType; /** - * Comprehensive parametrized test for segment scan functionality. - * - * This test verifies that segment scans work correctly across different key configurations - * ,segment counts, limits and filters. For each test run: - * 1. Creates a table with specified key schema - * 2. Inserts items while periodically splitting the underlying HBase table - * 3. Performs both full scan and segment scan with pagination, limits and filters - * 4. Verifies that both approaches return identical results to ddb + * Abstract base class for segment scan tests. */ -@RunWith(Parameterized.class) -public class SegmentScanIT extends BaseSegmentScanIT { - - private final String hashKeyName; - private final ScalarAttributeType hashKeyType; - private final String sortKeyName; - private final ScalarAttributeType sortKeyType; - private final int totalSegments; - private final int scanLimit; - private final boolean useFilter; - protected static final int TOTAL_ITEMS = 2000; - protected static final int SPLIT_FREQUENCY = 500; +public abstract class SegmentScanIT extends BaseSegmentScanIT { + + protected final String hashKeyName; + protected final ScalarAttributeType hashKeyType; + protected final String sortKeyName; + protected final ScalarAttributeType sortKeyType; + protected final int totalSegments; + protected final boolean useFilter; + protected final int filterNum; + protected static final int TOTAL_ITEMS = 25000; + protected static final int SPLIT_FREQUENCY = 4000; public SegmentScanIT(String hashKeyName, ScalarAttributeType hashKeyType, String sortKeyName, ScalarAttributeType sortKeyType, - int totalSegments, int scanLimit, boolean useFilter) { + boolean useFilter) { this.hashKeyName = hashKeyName; this.hashKeyType = hashKeyType; this.sortKeyName = sortKeyName; this.sortKeyType = sortKeyType; - this.totalSegments = totalSegments; - this.scanLimit = scanLimit; + Random random = new Random(); + this.totalSegments = random.nextInt(8) + 1; this.useFilter = useFilter; + this.filterNum = useFilter ? random.nextInt(15) + 1 : 0; } - @Parameterized.Parameters(name = "Hash_{1}_Sort_{3}_Segments_{4}_Limit_{5}_Filter_{6}") - public static Collection data() { - return Arrays.asList(new Object[][] { - // Hash key only - {"pk", ScalarAttributeType.S, null, null, 3, 34, true}, - {"pk", ScalarAttributeType.N, null, null, 4, 52, false}, - {"pk", ScalarAttributeType.B, null, null, 2, 41, true}, - - // String Hash Key combinations - {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.S, 1, 28, false}, - {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N, 2, 59, true}, - {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.B, 4, 45, false}, - - // Number Hash Key combinations - {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.S, 1, 23, true}, - {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.N, 2, 37, false}, - {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.B, 3, 51, true}, - - // Binary Hash Key combinations - {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.S, 2, 29, false}, - {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.N, 4, 46, true}, - {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.B, 3, 33, false}, - - // Total Segments > #regions - {"pk", ScalarAttributeType.S, null, null, 5, 58, true}, - {"pk", ScalarAttributeType.N, null, null, 6, 25, false}, - {"pk", ScalarAttributeType.B, null, null, 7, 42, true}, - {"pk", ScalarAttributeType.B, "sk", ScalarAttributeType.S, 5, 31, false}, - {"pk", ScalarAttributeType.S, "sk", ScalarAttributeType.N, 6, 55, true}, - {"pk", ScalarAttributeType.N, "sk", ScalarAttributeType.B, 7, 38, false}, - }); - } - - - @Test(timeout = 300000) + @Test(timeout = 600000) public void testSegmentScan() throws Exception { - final String tableName = testName.getMethodName().replaceAll("[\\[\\]]", "_"); + final String tableName = testName.getMethodName().replaceAll("[\\[\\]]", "_") + + "_" + generateRandomString(7); // Create table CreateTableRequest createTableRequest = DDLTestUtils.getCreateTableRequest( @@ -112,33 +69,45 @@ public void testSegmentScan() throws Exception { dynamoDbClient.createTable(createTableRequest); // Insert items with periodic table splitting - insertItemsWithSplitting(tableName, hashKeyName, hashKeyType, sortKeyName, sortKeyType, - TOTAL_ITEMS, SPLIT_FREQUENCY, 100); + insertItemsWithSplitting(tableName, hashKeyName, hashKeyType, sortKeyName, sortKeyType, + TOTAL_ITEMS, SPLIT_FREQUENCY, 200); + + Random random = new Random(); // Perform full scan with pagination - List> fullScanItemsPhoenix = performFullScanWithPagination(phoenixDBClientV2, tableName, useFilter); - List> fullScanItemsDDB = performFullScanWithPagination(dynamoDbClient, tableName, useFilter); + List> fullScanItemsPhoenix = + performFullScanWithPagination(phoenixDBClientV2, tableName, useFilter, filterNum, + random.nextInt(150) + 1); + List> fullScanItemsDDB = + performFullScanWithPagination(dynamoDbClient, tableName, useFilter, filterNum, + random.nextInt(200) + 1); // Perform segment scan serially on all segments with pagination - List> segmentScanItems = performSegmentScanWithPagination(tableName, useFilter); + List> segmentScanItems = + performSegmentScanWithPagination(tableName, useFilter, filterNum); // Verify results - TestUtils.verifyItemsEqual(fullScanItemsDDB, fullScanItemsPhoenix, hashKeyName, sortKeyName); - TestUtils.verifyItemsEqual(fullScanItemsPhoenix, segmentScanItems, hashKeyName, sortKeyName); + TestUtils.verifyItemsEqual(fullScanItemsDDB, fullScanItemsPhoenix, hashKeyName, + sortKeyName); + TestUtils.verifyItemsEqual(fullScanItemsPhoenix, segmentScanItems, hashKeyName, + sortKeyName); } /** * Perform segment scan across all segments with pagination. */ - private List> performSegmentScanWithPagination(String tableName, boolean useFilter) - throws SQLException { + private List> performSegmentScanWithPagination(String tableName, + boolean useFilter, int filterNum) throws SQLException { List> allItems = new ArrayList<>(); + Random random = new Random(); // Scan each segment sequentially int numRegions = TestUtils.getNumberOfTableRegions(testConnection, tableName); for (int segment = 0; segment < totalSegments; segment++) { - List> segmentItems = - scanSingleSegmentWithPagination(tableName, segment, totalSegments, scanLimit, false, useFilter); + int scanLimit = random.nextInt(200) + 1; + List> segmentItems = + scanSingleSegmentWithPagination(tableName, segment, totalSegments, scanLimit, + false, useFilter, filterNum); allItems.addAll(segmentItems); if (segment < numRegions && !useFilter) { Assert.assertFalse(segmentItems.isEmpty()); @@ -146,4 +115,14 @@ private List> performSegmentScanWithPagination(Strin } return allItems; } -} \ No newline at end of file + + private String generateRandomString(int length) { + String chars = "abcdefghijklmnopqrstuvwxyz0123456789-_."; + Random random = new Random(); + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append(chars.charAt(random.nextInt(chars.length()))); + } + return sb.toString(); + } +} From bb971d24dd2e5a9aa9f89743032b9ca2d2bdb25a Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Fri, 28 Nov 2025 11:27:00 +0530 Subject: [PATCH 2/2] autolink jira --- .asf.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.asf.yaml b/.asf.yaml index 672a419..b35d86c 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -40,6 +40,8 @@ github: squash: true merge: false rebase: false + autolink_jira: + - PHOENIX notifications: commits: commits@phoenix.apache.org issues: issues@phoenix.apache.org