From 446b4cccf97bf5fa9c15826909286e6fba7a49ac Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Thu, 16 Oct 2025 21:22:59 -0700 Subject: [PATCH 01/19] Extend IntegrationTestBackup restore into a base class with continuous and non-continuous subclasses Change-Id: I0c70c417b86c7732b58642a51c75897c35b16cb6 --- .../backup/IntegrationTestBackupRestore.java | 111 ++++++++++ .../IntegrationTestBackupRestoreBase.java} | 199 ++++++++---------- ...ntegrationTestContinuousBackupRestore.java | 123 +++++++++++ 3 files changed, 316 insertions(+), 117 deletions(-) create mode 100644 hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java rename hbase-it/src/test/java/org/apache/hadoop/hbase/{IntegrationTestBackupRestore.java => backup/IntegrationTestBackupRestoreBase.java} (69%) create mode 100644 hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java new file mode 100644 index 000000000000..cedc7bfa3e47 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; + +/** + * An integration test to detect regressions in HBASE-7912. Create a table with many regions, load + * data, perform series backup/load operations, then restore and verify data + * @see HBASE-7912 + * @see HBASE-14123 + */ +@Category(IntegrationTests.class) +public class IntegrationTestBackupRestore extends IntegrationTestBackupRestoreBase { + private static final String CLASS_NAME = IntegrationTestBackupRestore.class.getSimpleName(); + protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBackupRestore.class); + + @Override + @Before + public void setUp() throws Exception { + util = new IntegrationTestingUtility(); + Configuration conf = util.getConfiguration(); + regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT); + regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); + rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION); + numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS); + numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES); + sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); + enableBackup(conf); + LOG.info("Initializing cluster with {} region servers.", regionServerCount); + util.initializeCluster(regionServerCount); + LOG.info("Cluster initialized and ready"); + + backupRootDir = util.getDataTestDirOnTestFS() + Path.SEPARATOR + backupRootDir; + } + + @Test + public void testBackupRestore() throws Exception { + LOG.info("Running backup and restore integration test with continuous backup disabled"); + createTables(CLASS_NAME); + runTestMulti(false); + } + + /** Returns status of CLI execution */ + @Override + public int runTestFromCommandLine() throws Exception { + // Check if backup is enabled + if (!BackupManager.isBackupEnabled(getConf())) { + System.err.println(BackupRestoreConstants.ENABLE_BACKUP); + return -1; + } + System.out.println(BackupRestoreConstants.VERIFY_BACKUP); + testBackupRestore(); + return 0; + } + + @Override + protected void addOptions() { + super.addOptions(); + addOptWithArg(REGIONSERVER_COUNT_KEY, + "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'"); + } + + @Override + protected void processOptions(CommandLine cmd) { + super.processOptions(cmd); + regionServerCount = Integer.parseInt( + cmd.getOptionValue(REGIONSERVER_COUNT_KEY, Integer.toString(DEFAULT_REGIONSERVER_COUNT))); + + LOG.info(MoreObjects.toStringHelper("Parsed Options") + .add(REGION_COUNT_KEY, regionsCountPerServer).add(REGIONSERVER_COUNT_KEY, regionServerCount) + .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY, numIterations) + .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString()); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int status = ToolRunner.run(conf, new IntegrationTestBackupRestore(), args); + System.exit(status); + } +} diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java similarity index 69% rename from hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java rename to hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index f910df672009..db75067400e7 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestBackupRestore.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -15,9 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase; +package org.apache.hadoop.hbase.backup; import static org.apache.hadoop.hbase.IntegrationTestingUtility.createPreSplitLoadTestTable; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -28,13 +29,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.backup.BackupAdmin; -import org.apache.hadoop.hbase.backup.BackupInfo; -import org.apache.hadoop.hbase.backup.BackupInfo.BackupState; -import org.apache.hadoop.hbase.backup.BackupRequest; -import org.apache.hadoop.hbase.backup.BackupRestoreConstants; -import org.apache.hadoop.hbase.backup.BackupType; -import org.apache.hadoop.hbase.backup.RestoreRequest; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.IntegrationTestBase; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; @@ -42,39 +40,25 @@ import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; import org.apache.hadoop.hbase.chaos.policies.Policy; -import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; -import org.apache.hadoop.hbase.testclassification.IntegrationTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.util.ToolRunner; import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; -/** - * An integration test to detect regressions in HBASE-7912. Create a table with many regions, load - * data, perform series backup/load operations, then restore and verify data - * @see HBASE-7912 - * @see HBASE-14123 - */ -@Category(IntegrationTests.class) -public class IntegrationTestBackupRestore extends IntegrationTestBase { - private static final String CLASS_NAME = IntegrationTestBackupRestore.class.getSimpleName(); - protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBackupRestore.class); +public abstract class IntegrationTestBackupRestoreBase extends IntegrationTestBase { + protected static final Logger LOG = + LoggerFactory.getLogger(IntegrationTestBackupRestoreBase.class); protected static final String NUMBER_OF_TABLES_KEY = "num_tables"; protected static final String COLUMN_NAME = "f"; protected static final String REGION_COUNT_KEY = "regions_per_rs"; @@ -100,7 +84,7 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase { protected long sleepTime; protected static Object lock = new Object(); - private static String BACKUP_ROOT_DIR = "backupIT"; + protected String backupRootDir = "backupRootDir"; /* * This class is used to run the backup and restore thread(s). Throwing an exception in this @@ -110,10 +94,12 @@ public class IntegrationTestBackupRestore extends IntegrationTestBase { */ protected class BackupAndRestoreThread implements Runnable { private final TableName table; + private final boolean isContinuousBackupEnabled; private Throwable throwable; - public BackupAndRestoreThread(TableName table) { + public BackupAndRestoreThread(TableName table, boolean isContinuousBackupEnabled) { this.table = table; + this.isContinuousBackupEnabled = isContinuousBackupEnabled; this.throwable = null; } @@ -124,7 +110,7 @@ public Throwable getThrowable() { @Override public void run() { try { - runTestSingle(this.table); + runTestSingle(this.table, isContinuousBackupEnabled); } catch (Throwable t) { LOG.error( "An error occurred in thread {} when performing a backup and restore with table {}: ", @@ -134,23 +120,6 @@ public void run() { } } - @Override - @Before - public void setUp() throws Exception { - util = new IntegrationTestingUtility(); - Configuration conf = util.getConfiguration(); - regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT); - regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); - rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION); - numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS); - numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES); - sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); - enableBackup(conf); - LOG.info("Initializing cluster with {} region servers.", regionServerCount); - util.initializeCluster(regionServerCount); - LOG.info("Cluster initialized and ready"); - } - @After public void tearDown() throws IOException { LOG.info("Cleaning up after test."); @@ -178,42 +147,48 @@ private void deleteTablesIfAny() throws IOException { } } - private void createTables() throws Exception { + protected void createTables(String tableBaseName) throws Exception { tableNames = new TableName[numTables]; for (int i = 0; i < numTables; i++) { - tableNames[i] = TableName.valueOf(CLASS_NAME + ".table." + i); + tableNames[i] = TableName.valueOf(tableBaseName + ".table." + i); } for (TableName table : tableNames) { createTable(table); } } - private void enableBackup(Configuration conf) { + protected void enableBackup(Configuration conf) { // Enable backup conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); BackupManager.decorateMasterConfiguration(conf); BackupManager.decorateRegionServerConfiguration(conf); } - private void cleanUpBackupDir() throws IOException { - FileSystem fs = FileSystem.get(util.getConfiguration()); - fs.delete(new Path(BACKUP_ROOT_DIR), true); + protected void createAndSetBackupWalDir(IntegrationTestingUtility util, Configuration conf) + throws IOException { + Path root = util.getDataTestDirOnTestFS(); + Path backupWalDir = new Path(root, "backupWALDir"); + FileSystem fs = FileSystem.get(conf); + fs.mkdirs(backupWalDir); + conf.set(CONF_CONTINUOUS_BACKUP_WAL_DIR, backupWalDir.toString()); + LOG.info( + "The continuous backup WAL directory has been created and set in the configuration to: {}", + backupWalDir); } - @Test - public void testBackupRestore() throws Exception { - BACKUP_ROOT_DIR = util.getDataTestDirOnTestFS() + Path.SEPARATOR + BACKUP_ROOT_DIR; - createTables(); - runTestMulti(); + private void cleanUpBackupDir() throws IOException { + FileSystem fs = FileSystem.get(util.getConfiguration()); + fs.delete(new Path(backupRootDir), true); } - private void runTestMulti() throws Exception { + protected void runTestMulti(boolean isContinuousBackupEnabled) { LOG.info("IT backup & restore started"); Thread[] workers = new Thread[numTables]; BackupAndRestoreThread[] backupAndRestoreThreads = new BackupAndRestoreThread[numTables]; for (int i = 0; i < numTables; i++) { final TableName table = tableNames[i]; - BackupAndRestoreThread backupAndRestoreThread = new BackupAndRestoreThread(table); + BackupAndRestoreThread backupAndRestoreThread = + new BackupAndRestoreThread(table, isContinuousBackupEnabled); backupAndRestoreThreads[i] = backupAndRestoreThread; workers[i] = new Thread(backupAndRestoreThread); workers[i].start(); @@ -279,75 +254,91 @@ private void merge(String[] backupIds, BackupAdmin client) throws IOException { client.mergeBackups(backupIds); } - private void runTestSingle(TableName table) throws IOException { - + private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) + throws IOException { + String continuousBackupStatus = isContinuousBackupEnabled ? "enabled" : "disabled"; List backupIds = new ArrayList(); - try (Connection conn = util.getConnection(); Admin admin = conn.getAdmin(); - BackupAdmin client = new BackupAdminImpl(conn);) { - - // #0- insert some data to table 'table' + try (Connection conn = util.getConnection(); BackupAdmin client = new BackupAdminImpl(conn)) { loadData(table, rowsInIteration); // #1 - create full backup for table first - LOG.info("create full backup image for {}", table); + LOG.info("Creating full backup image for {} with continuous backup {}", table, + continuousBackupStatus); List tables = Lists.newArrayList(table); BackupRequest.Builder builder = new BackupRequest.Builder(); BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables) - .withTargetRootDir(BACKUP_ROOT_DIR).build(); + .withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled) + .build(); - String backupIdFull = backup(request, client); - assertTrue(checkSucceeded(backupIdFull)); + String fullBackupId = backup(request, client); + assertTrue(checkSucceeded(fullBackupId)); - backupIds.add(backupIdFull); + backupIds.add(fullBackupId); // Now continue with incremental backups int count = 1; - while (count++ < numIterations) { - - // Load data + String incrementalBackupId; + while (count++ <= numIterations) { + LOG.info("{} - Starting iteration {} of {}", Thread.currentThread().getName(), count - 1, + numIterations); loadData(table, rowsInIteration); + // Do incremental backup + LOG.info("Creating incremental backup number {} with continuous backup {} for {}", + count - 1, continuousBackupStatus, table); builder = new BackupRequest.Builder(); request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables) - .withTargetRootDir(BACKUP_ROOT_DIR).build(); - String backupId = backup(request, client); - assertTrue(checkSucceeded(backupId)); - backupIds.add(backupId); + .withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled) + .build(); + incrementalBackupId = backup(request, client); + assertTrue(checkSucceeded(incrementalBackupId)); + backupIds.add(incrementalBackupId); // Restore incremental backup for table, with overwrite for previous backup + if ((count - 2) > 0) { + LOG.info("Restoring {} using second most recent incremental backup", table); + } else { + LOG.info("Restoring {} using original full backup", table); + } String previousBackupId = backupIds.get(backupIds.size() - 2); - restoreVerifyTable(conn, client, table, previousBackupId, rowsInIteration * (count - 1)); + restoreTableAndVerifyRowCount(conn, client, table, previousBackupId, + (long) rowsInIteration * (count - 1)); + // Restore incremental backup for table, with overwrite for last backup - restoreVerifyTable(conn, client, table, backupId, rowsInIteration * count); + LOG.info("Restoring {} using most recent incremental backup", table); + restoreTableAndVerifyRowCount(conn, client, table, incrementalBackupId, + (long) rowsInIteration * count); + LOG.info("{} - Finished iteration {} of {}", Thread.currentThread().getName(), count - 1, + numIterations); } // Now merge all incremental and restore - String[] incBackupIds = allIncremental(backupIds); - merge(incBackupIds, client); + String[] incrementalBackupIds = getAllIncrementalBackupIds(backupIds); + merge(incrementalBackupIds, client); // Restore last one - String backupId = incBackupIds[incBackupIds.length - 1]; + incrementalBackupId = incrementalBackupIds[incrementalBackupIds.length - 1]; // restore incremental backup for table, with overwrite - TableName[] tablesRestoreIncMultiple = new TableName[] { table }; - restore(createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null, - true), client); + TableName[] tablesToRestoreFrom = new TableName[] { table }; + restore(createRestoreRequest(backupRootDir, incrementalBackupId, false, tablesToRestoreFrom, + null, true), client); Table hTable = conn.getTable(table); - Assert.assertEquals(util.countRows(hTable), rowsInIteration * numIterations); + Assert.assertEquals(rowsInIteration * (numIterations + 1), + HBaseTestingUtil.countRows(hTable)); hTable.close(); - LOG.info("{} loop {} finished.", Thread.currentThread().getName(), (count - 1)); } } - private void restoreVerifyTable(Connection conn, BackupAdmin client, TableName table, + private void restoreTableAndVerifyRowCount(Connection conn, BackupAdmin client, TableName table, String backupId, long expectedRows) throws IOException { TableName[] tablesRestoreIncMultiple = new TableName[] { table }; restore( - createRestoreRequest(BACKUP_ROOT_DIR, backupId, false, tablesRestoreIncMultiple, null, true), + createRestoreRequest(backupRootDir, backupId, false, tablesRestoreIncMultiple, null, true), client); Table hTable = conn.getTable(table); - Assert.assertEquals(expectedRows, util.countRows(hTable)); + Assert.assertEquals(expectedRows, HBaseTestingUtil.countRows(hTable)); hTable.close(); } - private String[] allIncremental(List backupIds) { + private String[] getAllIncrementalBackupIds(List backupIds) { int size = backupIds.size(); backupIds = backupIds.subList(1, size); String[] arr = new String[size - 1]; @@ -361,7 +352,7 @@ protected boolean checkSucceeded(String backupId) throws IOException { if (status == null) { return false; } - return status.getState() == BackupState.COMPLETE; + return status.getState() == BackupInfo.BackupState.COMPLETE; } private BackupInfo getBackupInfo(String backupId) throws IOException { @@ -396,19 +387,6 @@ public void setUpCluster() throws Exception { LOG.debug("Done initializing/checking cluster"); } - /** Returns status of CLI execution */ - @Override - public int runTestFromCommandLine() throws Exception { - // Check if backup is enabled - if (!BackupManager.isBackupEnabled(getConf())) { - System.err.println(BackupRestoreConstants.ENABLE_BACKUP); - return -1; - } - System.out.println(BackupRestoreConstants.VERIFY_BACKUP); - testBackupRestore(); - return 0; - } - @Override public TableName getTablename() { // That is only valid when Monkey is CALM (no monkey) @@ -423,8 +401,6 @@ protected Set getColumnFamilies() { @Override protected void addOptions() { - addOptWithArg(REGIONSERVER_COUNT_KEY, - "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'"); addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " + DEFAULT_REGION_COUNT); addOptWithArg(ROWS_PER_ITERATION_KEY, "Total number of data rows to be loaded during one iteration." + " Default: " @@ -452,17 +428,6 @@ protected void processOptions(CommandLine cmd) { cmd.getOptionValue(NUMBER_OF_TABLES_KEY, Integer.toString(DEFAULT_NUMBER_OF_TABLES))); sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, Long.toString(SLEEP_TIME_DEFAULT))); - - LOG.info(MoreObjects.toStringHelper("Parsed Options") - .add(REGION_COUNT_KEY, regionsCountPerServer).add(REGIONSERVER_COUNT_KEY, regionServerCount) - .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY, numIterations) - .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString()); } - public static void main(String[] args) throws Exception { - Configuration conf = HBaseConfiguration.create(); - IntegrationTestingUtility.setUseDistributedCluster(conf); - int status = ToolRunner.run(conf, new IntegrationTestBackupRestore(), args); - System.exit(status); - } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java new file mode 100644 index 000000000000..373a7e8d3ce9 --- /dev/null +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.backup; + +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_BACKUP_MAX_WAL_SIZE; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INITIAL_DELAY; +import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.CONF_STAGED_WAL_FLUSH_INTERVAL; +import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES; +import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_MISSING_FILES; +import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.IntegrationTestingUtility; +import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.testclassification.IntegrationTests; +import org.apache.hadoop.util.ToolRunner; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; +import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; + +/** + * An integration test to detect regressions in HBASE-7912. Create a table with many regions, load + * data, perform series backup/load operations, then restore and verify data + * @see HBASE-7912 + * @see HBASE-14123 + */ +@Category(IntegrationTests.class) +public class IntegrationTestContinuousBackupRestore extends IntegrationTestBackupRestoreBase { + private static final String CLASS_NAME = + IntegrationTestContinuousBackupRestore.class.getSimpleName(); + protected static final Logger LOG = + LoggerFactory.getLogger(IntegrationTestContinuousBackupRestore.class); + + @Override + @Before + public void setUp() throws Exception { + util = new IntegrationTestingUtility(); + Configuration conf = util.getConfiguration(); + regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT); + // We are using only 1 region server because we cannot wait for all region servers to catch up + // with replication. Therefore, we cannot be sure about how many rows will be restored after an + // incremental backup. + regionServerCount = 1; + rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION); + numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS); + numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES); + sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); + enableBackup(conf); + conf.set(CONF_BACKUP_MAX_WAL_SIZE, "10240"); + conf.set(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); + conf.set(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); + createAndSetBackupWalDir(util, conf); + conf.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); + conf.setBoolean(IGNORE_EMPTY_FILES, true); + conf.setBoolean(IGNORE_MISSING_FILES, true); + + LOG.info("Initializing cluster with {} region server(s)", regionServerCount); + util.initializeCluster(regionServerCount); + LOG.info("Cluster initialized and ready"); + + backupRootDir = util.getDataTestDirOnTestFS() + Path.SEPARATOR + backupRootDir; + LOG.info("The backup root directory is: {}", backupRootDir); + } + + @Test + public void testContinuousBackupRestore() throws Exception { + LOG.info("Running backup and restore integration test with continuous backup enabled"); + createTables(CLASS_NAME); + runTestMulti(true); + } + + /** Returns status of CLI execution */ + @Override + public int runTestFromCommandLine() throws Exception { + // Check if backup is enabled + if (!BackupManager.isBackupEnabled(getConf())) { + System.err.println(BackupRestoreConstants.ENABLE_BACKUP); + return -1; + } + System.out.println(BackupRestoreConstants.VERIFY_BACKUP); + testContinuousBackupRestore(); + return 0; + } + + @Override + protected void processOptions(CommandLine cmd) { + super.processOptions(cmd); + + LOG.info( + MoreObjects.toStringHelper("Parsed Options").add(REGION_COUNT_KEY, regionsCountPerServer) + .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY, numIterations) + .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString()); + } + + public static void main(String[] args) throws Exception { + Configuration conf = HBaseConfiguration.create(); + IntegrationTestingUtility.setUseDistributedCluster(conf); + int status = ToolRunner.run(conf, new IntegrationTestContinuousBackupRestore(), args); + System.exit(status); + } +} From 4339ffd6b4b1a999a8bd9860c64da6d9d50c026d Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Fri, 17 Oct 2025 18:35:09 -0700 Subject: [PATCH 02/19] Add more test cases to runTestSingle for testContinuousBackupRestore Change-Id: Id043400bf85c7b696bb94bef7cb17ed9dad13334 --- .../hadoop/hbase/backup/BackupTestUtil.java | 19 +++- .../hbase/backup/TestContinuousBackup.java | 17 +--- hbase-it/pom.xml | 7 ++ .../backup/IntegrationTestBackupRestore.java | 7 +- .../IntegrationTestBackupRestoreBase.java | 86 +++++++++++++++---- ...ntegrationTestContinuousBackupRestore.java | 8 +- 6 files changed, 105 insertions(+), 39 deletions(-) diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java index 3665eeb7a76c..37abf3580166 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java @@ -17,14 +17,20 @@ */ package org.apache.hadoop.hbase.backup; +import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.impl.BackupManager; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private @@ -46,10 +52,21 @@ static BackupInfo verifyBackup(Configuration conf, String backupId, BackupType e } } - static void enableBackup(Configuration conf) { + public static void enableBackup(Configuration conf) { // Enable backup conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); BackupManager.decorateMasterConfiguration(conf); BackupManager.decorateRegionServerConfiguration(conf); } + + public static void verifyReplicationPeerSubscription(HBaseTestingUtil util, TableName tableName) throws IOException { + try (Admin admin = util.getAdmin()) { + ReplicationPeerDescription peerDesc = admin.listReplicationPeers().stream() + .filter(peer -> peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER)).findFirst() + .orElseThrow(() -> new AssertionError("Replication peer not found")); + + assertTrue("Table should be subscribed to the replication peer", + peerDesc.getPeerConfig().getTableCFsMap().containsKey(tableName)); + } + } } diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java index 2fdfa8b73f8b..e76bfa0c090e 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java @@ -114,7 +114,7 @@ public void testContinuousBackupWithFullBackup() throws Exception { } // Verify replication peer subscription - verifyReplicationPeerSubscription(tableName); + BackupTestUtil.verifyReplicationPeerSubscription(TEST_UTIL, tableName); // Verify table is registered in Backup System Table verifyTableInBackupSystemTable(tableName); @@ -157,8 +157,8 @@ public void testContinuousBackupForMultipleTables() throws Exception { } // Verify replication peer subscription for each table - verifyReplicationPeerSubscription(tableName1); - verifyReplicationPeerSubscription(tableName2); + BackupTestUtil.verifyReplicationPeerSubscription(TEST_UTIL, tableName1); + BackupTestUtil.verifyReplicationPeerSubscription(TEST_UTIL, tableName2); // Verify tables are registered in Backup System Table verifyTableInBackupSystemTable(tableName1); @@ -248,17 +248,6 @@ public void testContinuousBackupWithIncrementalBackup() throws Exception { } } - private void verifyReplicationPeerSubscription(TableName table) throws IOException { - try (Admin admin = TEST_UTIL.getAdmin()) { - ReplicationPeerDescription peerDesc = admin.listReplicationPeers().stream() - .filter(peer -> peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER)).findFirst() - .orElseThrow(() -> new AssertionError("Replication peer not found")); - - assertTrue("Table should be subscribed to the replication peer", - peerDesc.getPeerConfig().getTableCFsMap().containsKey(table)); - } - } - String[] buildBackupArgs(String backupType, TableName[] tables, boolean continuousEnabled) { String tableNames = Arrays.stream(tables).map(TableName::getNameAsString).collect(Collectors.joining(",")); diff --git a/hbase-it/pom.xml b/hbase-it/pom.xml index 5bc9af02782e..a282fcc28d5c 100644 --- a/hbase-it/pom.xml +++ b/hbase-it/pom.xml @@ -60,6 +60,13 @@ test-jar test + + org.apache.hbase + hbase-backup + ${project.version} + test-jar + test + org.apache.hbase hbase-logging diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java index cedc7bfa3e47..c363ee9501c1 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.backup; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.IntegrationTestingUtility; @@ -48,19 +49,21 @@ public class IntegrationTestBackupRestore extends IntegrationTestBackupRestoreBa @Before public void setUp() throws Exception { util = new IntegrationTestingUtility(); - Configuration conf = util.getConfiguration(); + conf = util.getConfiguration(); regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT); regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION); numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS); numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES); sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); - enableBackup(conf); + BackupTestUtil.enableBackup(conf); LOG.info("Initializing cluster with {} region servers.", regionServerCount); util.initializeCluster(regionServerCount); LOG.info("Cluster initialized and ready"); backupRootDir = util.getDataTestDirOnTestFS() + Path.SEPARATOR + backupRootDir; + LOG.info("The backup root directory is: {}", backupRootDir); + fs = FileSystem.get(conf); } @Test diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index db75067400e7..3f35b7d51bee 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -20,22 +20,24 @@ import static org.apache.hadoop.hbase.IntegrationTestingUtility.createPreSplitLoadTestTable; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Set; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.IntegrationTestBase; -import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; -import org.apache.hadoop.hbase.backup.impl.BackupManager; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; +import org.apache.hadoop.hbase.backup.util.BackupUtils; import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction; import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey; import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy; @@ -47,6 +49,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; import org.junit.After; import org.junit.Assert; import org.slf4j.Logger; @@ -68,8 +71,8 @@ public abstract class IntegrationTestBackupRestoreBase extends IntegrationTestBa protected static final int DEFAULT_REGION_COUNT = 10; protected static final int DEFAULT_REGIONSERVER_COUNT = 5; protected static final int DEFAULT_NUMBER_OF_TABLES = 1; - protected static final int DEFAULT_NUM_ITERATIONS = 10; - protected static final int DEFAULT_ROWS_IN_ITERATION = 10000; + protected static final int DEFAULT_NUM_ITERATIONS = 1; + protected static final int DEFAULT_ROWS_IN_ITERATION = 100; protected static final String SLEEP_TIME_KEY = "sleeptime"; // short default interval because tests don't run very long. protected static final long SLEEP_TIME_DEFAULT = 50000L; @@ -84,6 +87,7 @@ public abstract class IntegrationTestBackupRestoreBase extends IntegrationTestBa protected long sleepTime; protected static Object lock = new Object(); + protected FileSystem fs; protected String backupRootDir = "backupRootDir"; /* @@ -157,14 +161,7 @@ protected void createTables(String tableBaseName) throws Exception { } } - protected void enableBackup(Configuration conf) { - // Enable backup - conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); - BackupManager.decorateMasterConfiguration(conf); - BackupManager.decorateRegionServerConfiguration(conf); - } - - protected void createAndSetBackupWalDir(IntegrationTestingUtility util, Configuration conf) + protected void createAndSetBackupWalDir() throws IOException { Path root = util.getDataTestDirOnTestFS(); Path backupWalDir = new Path(root, "backupWALDir"); @@ -255,7 +252,7 @@ private void merge(String[] backupIds, BackupAdmin client) throws IOException { } private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) - throws IOException { + throws IOException, InterruptedException { String continuousBackupStatus = isContinuousBackupEnabled ? "enabled" : "disabled"; List backupIds = new ArrayList(); @@ -274,6 +271,39 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) String fullBackupId = backup(request, client); assertTrue(checkSucceeded(fullBackupId)); + verifySnapshotExists(table, fullBackupId); + + if (isContinuousBackupEnabled) { + BackupTestUtil.verifyReplicationPeerSubscription(util, table); + String backupWALDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); + LOG.info("kevin: got backupWALDir = {}", backupWALDir); + Path backupWALs = new Path(backupWALDir, "WALs"); + LOG.info("kevin: backupWALs = {}", backupWALs); + assertTrue("There should be a WALs directory inside of the backup WAL directory at: " + backupWALDir, + fs.exists(backupWALs)); + long currentTimeMs = System.currentTimeMillis(); + String currentDateUTC = BackupUtils.formatToDateString(currentTimeMs); + Path walPartitionDir = new Path(backupWALs, currentDateUTC); + // TODO - wait for WAL date partition directory to appear + // do something like checking for existence and then waiting for 1 second up to 10 times + + // TODO - verify the contents of WAL partition dir (backupWALDir/WALs/2025-10-18) + // it should have something like this: + // backupWALDir/WALs/2025-10-17/wal_file.1760738249595.1880be89-0b69-4bad-8d0e-acbf25c63b7e + + Thread.sleep(10000); // put this here to wait for backupWALDir/WALs/2025-10-18 dir + assertTrue("The a backup WALs subdirectory with today's date should exist: ", fs.exists(walPartitionDir)); + + + FileStatus[] fileStatuses = fs.listStatus(backupWALs); + for (FileStatus fileStatus : fileStatuses) { + LOG.info("kevin: fileStatus = {}", fileStatus); + } + } + + LOG.info("kevin: sleeping"); + Thread.sleep(30*60*1000); + backupIds.add(fullBackupId); // Now continue with incremental backups int count = 1; @@ -318,7 +348,7 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) incrementalBackupId = incrementalBackupIds[incrementalBackupIds.length - 1]; // restore incremental backup for table, with overwrite TableName[] tablesToRestoreFrom = new TableName[] { table }; - restore(createRestoreRequest(backupRootDir, incrementalBackupId, false, tablesToRestoreFrom, + restore(createRestoreRequest(incrementalBackupId, false, tablesToRestoreFrom, null, true), client); Table hTable = conn.getTable(table); Assert.assertEquals(rowsInIteration * (numIterations + 1), @@ -327,11 +357,29 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) } } + private void verifySnapshotExists(TableName tableName, String backupId) throws IOException { + RemoteIterator fileStatusIterator = fs.listFiles(new Path(backupRootDir, backupId), true); + Path dataManifestPath = null; + while (fileStatusIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusIterator.next(); + if (fileStatus.getPath().getName().endsWith("data.manifest")) { + dataManifestPath = fileStatus.getPath(); + LOG.info( + "Found snapshot manifest for table '{}' at: {}", + tableName, dataManifestPath); + } + } + + if (dataManifestPath == null) { + fail("Could not find snapshot manifest for table '" + tableName + "'"); + } + } + private void restoreTableAndVerifyRowCount(Connection conn, BackupAdmin client, TableName table, String backupId, long expectedRows) throws IOException { TableName[] tablesRestoreIncMultiple = new TableName[] { table }; restore( - createRestoreRequest(backupRootDir, backupId, false, tablesRestoreIncMultiple, null, true), + createRestoreRequest(backupId, false, tablesRestoreIncMultiple, null, true), client); Table hTable = conn.getTable(table); Assert.assertEquals(expectedRows, HBaseTestingUtil.countRows(hTable)); @@ -363,7 +411,6 @@ private BackupInfo getBackupInfo(String backupId) throws IOException { /** * Get restore request. - * @param backupRootDir directory where backup is located * @param backupId backup ID * @param check check the backup * @param fromTables table names to restore from @@ -371,7 +418,7 @@ private BackupInfo getBackupInfo(String backupId) throws IOException { * @param isOverwrite overwrite the table(s) * @return an instance of RestoreRequest */ - public RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check, + public RestoreRequest createRestoreRequest(String backupId, boolean check, TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { RestoreRequest.Builder builder = new RestoreRequest.Builder(); return builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check) @@ -381,7 +428,8 @@ public RestoreRequest createRestoreRequest(String backupRootDir, String backupId @Override public void setUpCluster() throws Exception { util = getTestingUtil(getConf()); - enableBackup(getConf()); + conf = getConf(); + BackupTestUtil.enableBackup(conf); LOG.debug("Initializing/checking cluster has {} servers", regionServerCount); util.initializeCluster(regionServerCount); LOG.debug("Done initializing/checking cluster"); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java index 373a7e8d3ce9..19838eed3fa2 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java @@ -25,6 +25,7 @@ import static org.apache.hadoop.hbase.replication.regionserver.ReplicationMarkerChore.REPLICATION_MARKER_ENABLED_KEY; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.IntegrationTestingUtility; @@ -57,7 +58,7 @@ public class IntegrationTestContinuousBackupRestore extends IntegrationTestBacku @Before public void setUp() throws Exception { util = new IntegrationTestingUtility(); - Configuration conf = util.getConfiguration(); + conf = util.getConfiguration(); regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT); // We are using only 1 region server because we cannot wait for all region servers to catch up // with replication. Therefore, we cannot be sure about how many rows will be restored after an @@ -67,11 +68,10 @@ public void setUp() throws Exception { numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS); numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES); sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); - enableBackup(conf); + BackupTestUtil.enableBackup(conf); conf.set(CONF_BACKUP_MAX_WAL_SIZE, "10240"); conf.set(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); conf.set(CONF_STAGED_WAL_FLUSH_INTERVAL, "10"); - createAndSetBackupWalDir(util, conf); conf.setBoolean(REPLICATION_MARKER_ENABLED_KEY, true); conf.setBoolean(IGNORE_EMPTY_FILES, true); conf.setBoolean(IGNORE_MISSING_FILES, true); @@ -82,6 +82,8 @@ public void setUp() throws Exception { backupRootDir = util.getDataTestDirOnTestFS() + Path.SEPARATOR + backupRootDir; LOG.info("The backup root directory is: {}", backupRootDir); + createAndSetBackupWalDir(); + fs = FileSystem.get(conf); } @Test From b0caea68265311c70ce281ab5545b22b079fd635 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Mon, 20 Oct 2025 17:52:58 -0700 Subject: [PATCH 03/19] Add more test cases for full continuous backup; Change while loop to a for loop Change-Id: I5ba3276919e6bbdf343c134fa287c69f3854a8a2 --- .../IntegrationTestBackupRestoreBase.java | 153 +++++++++++++----- 1 file changed, 111 insertions(+), 42 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 3f35b7d51bee..61a03079f11e 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hbase.IntegrationTestingUtility.createPreSplitLoadTestTable; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -26,6 +27,7 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.Scanner; import java.util.Set; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -49,7 +51,6 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet; import org.junit.After; import org.junit.Assert; import org.slf4j.Logger; @@ -251,9 +252,13 @@ private void merge(String[] backupIds, BackupAdmin client) throws IOException { client.mergeBackups(backupIds); } + private void delete(String[] backupIds, BackupAdmin client) throws IOException { + client.deleteBackups(backupIds); + } + private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) throws IOException, InterruptedException { - String continuousBackupStatus = isContinuousBackupEnabled ? "enabled" : "disabled"; + String enabledOrDisabled = isContinuousBackupEnabled ? "enabled" : "disabled"; List backupIds = new ArrayList(); try (Connection conn = util.getConnection(); BackupAdmin client = new BackupAdminImpl(conn)) { @@ -261,7 +266,7 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) // #1 - create full backup for table first LOG.info("Creating full backup image for {} with continuous backup {}", table, - continuousBackupStatus); + enabledOrDisabled); List tables = Lists.newArrayList(table); BackupRequest.Builder builder = new BackupRequest.Builder(); BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables) @@ -270,77 +275,60 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) String fullBackupId = backup(request, client); assertTrue(checkSucceeded(fullBackupId)); + LOG.info("Created full backup with ID: {}", fullBackupId); verifySnapshotExists(table, fullBackupId); + // Run verifications specific to continuous backup if (isContinuousBackupEnabled) { BackupTestUtil.verifyReplicationPeerSubscription(util, table); - String backupWALDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); - LOG.info("kevin: got backupWALDir = {}", backupWALDir); - Path backupWALs = new Path(backupWALDir, "WALs"); - LOG.info("kevin: backupWALs = {}", backupWALs); - assertTrue("There should be a WALs directory inside of the backup WAL directory at: " + backupWALDir, - fs.exists(backupWALs)); - long currentTimeMs = System.currentTimeMillis(); - String currentDateUTC = BackupUtils.formatToDateString(currentTimeMs); - Path walPartitionDir = new Path(backupWALs, currentDateUTC); - // TODO - wait for WAL date partition directory to appear - // do something like checking for existence and then waiting for 1 second up to 10 times - - // TODO - verify the contents of WAL partition dir (backupWALDir/WALs/2025-10-18) - // it should have something like this: - // backupWALDir/WALs/2025-10-17/wal_file.1760738249595.1880be89-0b69-4bad-8d0e-acbf25c63b7e - - Thread.sleep(10000); // put this here to wait for backupWALDir/WALs/2025-10-18 dir - assertTrue("The a backup WALs subdirectory with today's date should exist: ", fs.exists(walPartitionDir)); - - - FileStatus[] fileStatuses = fs.listStatus(backupWALs); - for (FileStatus fileStatus : fileStatuses) { - LOG.info("kevin: fileStatus = {}", fileStatus); - } + Path backupWALs = verifyWALsDirectoryExists(); + Path walPartitionDir = verifyWALPartitionDirExists(backupWALs); + verifyBackupWALFiles(walPartitionDir); } - LOG.info("kevin: sleeping"); - Thread.sleep(30*60*1000); - backupIds.add(fullBackupId); + // Now continue with incremental backups - int count = 1; String incrementalBackupId; - while (count++ <= numIterations) { - LOG.info("{} - Starting iteration {} of {}", Thread.currentThread().getName(), count - 1, + for (int count = 1; count <= numIterations; count++) { + LOG.info("{} - Starting incremental backup iteration {} of {}", Thread.currentThread().getName(), count, numIterations); loadData(table, rowsInIteration); // Do incremental backup LOG.info("Creating incremental backup number {} with continuous backup {} for {}", - count - 1, continuousBackupStatus, table); + count, enabledOrDisabled, table); builder = new BackupRequest.Builder(); request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables) .withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled) .build(); incrementalBackupId = backup(request, client); assertTrue(checkSucceeded(incrementalBackupId)); + LOG.info("Created incremental backup with ID: {}", incrementalBackupId); backupIds.add(incrementalBackupId); // Restore incremental backup for table, with overwrite for previous backup - if ((count - 2) > 0) { - LOG.info("Restoring {} using second most recent incremental backup", table); + String previousBackupId = backupIds.get(backupIds.size() - 2); + if (previousBackupId.equals(fullBackupId)) { + LOG.info("Restoring {} using original full backup with ID: {}", table, previousBackupId); } else { - LOG.info("Restoring {} using original full backup", table); + LOG.info("Restoring {} using second most recent incremental backup with ID: {}", + table, previousBackupId); } - String previousBackupId = backupIds.get(backupIds.size() - 2); restoreTableAndVerifyRowCount(conn, client, table, previousBackupId, - (long) rowsInIteration * (count - 1)); + (long) rowsInIteration * count); // Restore incremental backup for table, with overwrite for last backup - LOG.info("Restoring {} using most recent incremental backup", table); + LOG.info("Restoring {} using most recent incremental backup with ID: {}", + table, incrementalBackupId); restoreTableAndVerifyRowCount(conn, client, table, incrementalBackupId, - (long) rowsInIteration * count); - LOG.info("{} - Finished iteration {} of {}", Thread.currentThread().getName(), count - 1, + (long) rowsInIteration * (count + 1)); + LOG.info("{} - Finished incremental backup iteration {} of {}", + Thread.currentThread().getName(), count, numIterations); } + // Now merge all incremental and restore String[] incrementalBackupIds = getAllIncrementalBackupIds(backupIds); merge(incrementalBackupIds, client); @@ -354,9 +342,37 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) Assert.assertEquals(rowsInIteration * (numIterations + 1), HBaseTestingUtil.countRows(hTable)); hTable.close(); + + LOG.info("kevin: backupRootDir before delete"); + FileStatus[] fileStatuses = fs.listStatus(new Path(backupRootDir)); + for (FileStatus fileStatus : fileStatuses) { + LOG.info("kevin: before delete fileStatus = {}", fileStatus); + } + + // Delete the full backup + delete(new String[] {fullBackupId}, client); + + // TODO + // 1. why are both the full backup and the incremental backup being deleted? + // 2. look at the file structure inside of the incremental backup directory and see + // how it differs from the full backup's structure + // 3. + + LOG.info("kevin: backupRootDir after delete"); + fileStatuses = fs.listStatus(new Path(backupRootDir)); + for (FileStatus fileStatus : fileStatuses) { + LOG.info("kevin: after delete fileStatus = {}", fileStatus); + } } } + private void runInputScanner() { + Scanner scanner = new Scanner(System.in); + System.out.println("kevin: Waiting for the user to input a line"); + String line = scanner.nextLine(); + System.out.printf("kevin: Got a line: '%s'. Continuing%n", line); + } + private void verifySnapshotExists(TableName tableName, String backupId) throws IOException { RemoteIterator fileStatusIterator = fs.listFiles(new Path(backupRootDir, backupId), true); Path dataManifestPath = null; @@ -375,6 +391,59 @@ private void verifySnapshotExists(TableName tableName, String backupId) throws I } } + private Path verifyWALsDirectoryExists() throws IOException { + String backupWALDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); + Path backupWALs = new Path(backupWALDir, "WALs"); + assertTrue("There should be a WALs directory inside of the backup WAL directory at: " + + backupWALDir, fs.exists(backupWALs)); + return backupWALs; + } + + /** + * Waits for a WAL partition directory to exist inside the backup WAL directory. The directory + * should be something like: .../backupWALDir/WALs/2025-10-17. The directory's existence is + * either eventually asserted, or an assertion error is thrown if it does not exist past the wait + * deadline. This verification is to be used for full backups with continuous backup enabled. + * @param backupWALs The directory that should contain the partition directory. + * i.e. .../backupWALDir/WALs + * @return The Path to the WAL partition directory + */ + private Path verifyWALPartitionDirExists(Path backupWALs) throws IOException, + InterruptedException { + long currentTimeMs = System.currentTimeMillis(); + String currentDateUTC = BackupUtils.formatToDateString(currentTimeMs); + Path walPartitionDir = new Path(backupWALs, currentDateUTC); + int waitTimeSec = 30; + while (true) { + try { + assertTrue("A backup WALs subdirectory with today's date should exist: " + + walPartitionDir, fs.exists(walPartitionDir)); + break; + } catch (AssertionError e) { + if ((System.currentTimeMillis() - currentTimeMs) >= waitTimeSec*1000) { + throw new AssertionError(e); + } + LOG.info("Waiting up to {} seconds for WAL partition directory to exist: {}", + waitTimeSec, walPartitionDir); + Thread.sleep(1000); + } + } + return walPartitionDir; + } + + private void verifyBackupWALFiles(Path walPartitionDir) throws IOException { + FileStatus[] fileStatuses = fs.listStatus(walPartitionDir); + for (FileStatus fileStatus : fileStatuses) { + String walFileName = fileStatus.getPath().getName(); + String[] splitName = walFileName.split("\\."); + assertEquals("The WAL partition directory should only have files that start with 'wal_file'", + "wal_file", splitName[0]); + assertEquals("The timestamp in the WAL file's name should match the date for the WAL partition directory", + walPartitionDir.getName(), BackupUtils.formatToDateString( + Long.parseLong(splitName[1]))); + } + } + private void restoreTableAndVerifyRowCount(Connection conn, BackupAdmin client, TableName table, String backupId, long expectedRows) throws IOException { TableName[] tablesRestoreIncMultiple = new TableName[] { table }; From 8d6340f24da38b028e12d80a26dc79d61bb69849 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Thu, 23 Oct 2025 16:28:02 -0700 Subject: [PATCH 04/19] Add delete test case Change-Id: I25fe484e9c227b7a31cb3768def3c12f66d617ac --- .../IntegrationTestBackupRestoreBase.java | 57 ++++++++++++------- 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 61a03079f11e..0bb88558beac 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.IntegrationTestingUtility.createPreSplitLoadTestTable; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -259,7 +260,7 @@ private void delete(String[] backupIds, BackupAdmin client) throws IOException { private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) throws IOException, InterruptedException { String enabledOrDisabled = isContinuousBackupEnabled ? "enabled" : "disabled"; - List backupIds = new ArrayList(); + List backupIds = new ArrayList<>(); try (Connection conn = util.getConnection(); BackupAdmin client = new BackupAdminImpl(conn)) { loadData(table, rowsInIteration); @@ -325,8 +326,7 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) restoreTableAndVerifyRowCount(conn, client, table, incrementalBackupId, (long) rowsInIteration * (count + 1)); LOG.info("{} - Finished incremental backup iteration {} of {}", - Thread.currentThread().getName(), count, - numIterations); + Thread.currentThread().getName(), count, numIterations); } // Now merge all incremental and restore @@ -343,26 +343,43 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) HBaseTestingUtil.countRows(hTable)); hTable.close(); - LOG.info("kevin: backupRootDir before delete"); - FileStatus[] fileStatuses = fs.listStatus(new Path(backupRootDir)); - for (FileStatus fileStatus : fileStatuses) { - LOG.info("kevin: before delete fileStatus = {}", fileStatus); - } - + deleteMostRecentIncrementalBackup(backupIds, client); + // The full backup and all previous incremental backups should still exist + verifyAllBackupTypesExist(fullBackupId, getAllIncrementalBackupIds(backupIds)); // Delete the full backup delete(new String[] {fullBackupId}, client); + // The full backup and all incremental backups should now be deleted + for (String backupId : backupIds) { + assertFalse("The backup " + backupId + " should no longer exist", + fs.exists(new Path(backupRootDir, backupId))); + } + } + } - // TODO - // 1. why are both the full backup and the incremental backup being deleted? - // 2. look at the file structure inside of the incremental backup directory and see - // how it differs from the full backup's structure - // 3. + private void deleteMostRecentIncrementalBackup(List backupIds, BackupAdmin client) + throws IOException { + String incrementalBackupId = backupIds.get(backupIds.size() - 1); + assertTrue("Final incremental backup " + incrementalBackupId + + " should still exist inside of " + backupRootDir, + fs.exists(new Path(backupRootDir, incrementalBackupId))); - LOG.info("kevin: backupRootDir after delete"); - fileStatuses = fs.listStatus(new Path(backupRootDir)); - for (FileStatus fileStatus : fileStatuses) { - LOG.info("kevin: after delete fileStatus = {}", fileStatus); - } + delete(new String[] {incrementalBackupId}, client); + backupIds.remove(backupIds.size() - 1); + + assertFalse("Final incremental backup " + incrementalBackupId + + " should no longer exist inside of " + backupRootDir, + fs.exists(new Path(backupRootDir, incrementalBackupId))); + } + + private void verifyAllBackupTypesExist(String fullBackupId, String[] incrementalBackups) + throws IOException { + // The full backup should still exist + assertTrue("Full backup " + fullBackupId + " should still exist inside of " + backupRootDir, + fs.exists(new Path(backupRootDir, fullBackupId))); + // All other incremental backups should still exist + for (String backupId : incrementalBackups) { + assertTrue("Incremental backup " + backupId + " should still exist inside of " + backupRootDir, + fs.exists(new Path(backupRootDir, backupId))); } } @@ -418,8 +435,10 @@ private Path verifyWALPartitionDirExists(Path backupWALs) throws IOException, try { assertTrue("A backup WALs subdirectory with today's date should exist: " + walPartitionDir, fs.exists(walPartitionDir)); + // The directory exists - stop waiting break; } catch (AssertionError e) { + // Reach here when the directory currently does not exist if ((System.currentTimeMillis() - currentTimeMs) >= waitTimeSec*1000) { throw new AssertionError(e); } From 372cfe33b34e29fdb8b12e134951c3fd80f95141 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Thu, 23 Oct 2025 18:18:23 -0700 Subject: [PATCH 05/19] Start adding changes after looking at WIP PR in GitHub Change-Id: Ie9aece8a3ec55739d618ebf2d2f173a41a116eb6 --- .../hadoop/hbase/backup/BackupTestUtil.java | 1 - .../IntegrationTestBackupRestoreBase.java | 43 ++++++++++++------- ...ntegrationTestContinuousBackupRestore.java | 8 ++-- 3 files changed, 32 insertions(+), 20 deletions(-) diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java index 37abf3580166..7b93d3fcc4af 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java @@ -53,7 +53,6 @@ static BackupInfo verifyBackup(Configuration conf, String backupId, BackupType e } public static void enableBackup(Configuration conf) { - // Enable backup conf.setBoolean(BackupRestoreConstants.BACKUP_ENABLE_KEY, true); BackupManager.decorateMasterConfiguration(conf); BackupManager.decorateRegionServerConfiguration(conf); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 0bb88558beac..37eb2238f372 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -61,6 +61,14 @@ import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; +/** + * An abstract base class that is used to run backup, restore, and delete integration tests. This class + * performs both full backups and incremental backups. Both continuous backup and non-continuous + * backup test cases are supported. The number of incremental backups performed depends on the number + * of iterations defined by the user. The class performs the backup/restore in a separate thread, where + * one thread is created per table. The number of tables is user-defined, along with other various + * configurations. + */ public abstract class IntegrationTestBackupRestoreBase extends IntegrationTestBase { protected static final Logger LOG = LoggerFactory.getLogger(IntegrationTestBackupRestoreBase.class); @@ -73,8 +81,8 @@ public abstract class IntegrationTestBackupRestoreBase extends IntegrationTestBa protected static final int DEFAULT_REGION_COUNT = 10; protected static final int DEFAULT_REGIONSERVER_COUNT = 5; protected static final int DEFAULT_NUMBER_OF_TABLES = 1; - protected static final int DEFAULT_NUM_ITERATIONS = 1; - protected static final int DEFAULT_ROWS_IN_ITERATION = 100; + protected static final int DEFAULT_NUM_ITERATIONS = 10; + protected static final int DEFAULT_ROWS_IN_ITERATION = 10000; protected static final String SLEEP_TIME_KEY = "sleeptime"; // short default interval because tests don't run very long. protected static final long SLEEP_TIME_DEFAULT = 50000L; @@ -95,8 +103,8 @@ public abstract class IntegrationTestBackupRestoreBase extends IntegrationTestBa /* * This class is used to run the backup and restore thread(s). Throwing an exception in this * thread will not cause the test to fail, so the purpose of this class is to both kick off the - * backup and restore and record any exceptions that occur so they can be thrown in the main - * thread. + * backup and restore, as well as record any exceptions that occur so they can be thrown in the + * main thread. */ protected class BackupAndRestoreThread implements Runnable { private final TableName table; @@ -116,6 +124,8 @@ public Throwable getThrowable() { @Override public void run() { try { + LOG.info("Running backup and restore test for {} in thread {}", this.table, + Thread.currentThread()); runTestSingle(this.table, isContinuousBackupEnabled); } catch (Throwable t) { LOG.error( @@ -181,7 +191,6 @@ private void cleanUpBackupDir() throws IOException { } protected void runTestMulti(boolean isContinuousBackupEnabled) { - LOG.info("IT backup & restore started"); Thread[] workers = new Thread[numTables]; BackupAndRestoreThread[] backupAndRestoreThreads = new BackupAndRestoreThread[numTables]; for (int i = 0; i < numTables; i++) { @@ -265,7 +274,7 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) try (Connection conn = util.getConnection(); BackupAdmin client = new BackupAdminImpl(conn)) { loadData(table, rowsInIteration); - // #1 - create full backup for table first + // First create a full backup for the table LOG.info("Creating full backup image for {} with continuous backup {}", table, enabledOrDisabled); List tables = Lists.newArrayList(table); @@ -280,7 +289,7 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) verifySnapshotExists(table, fullBackupId); - // Run verifications specific to continuous backup + // Run full backup verifications specific to continuous backup if (isContinuousBackupEnabled) { BackupTestUtil.verifyReplicationPeerSubscription(util, table); Path backupWALs = verifyWALsDirectoryExists(); @@ -293,8 +302,8 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) // Now continue with incremental backups String incrementalBackupId; for (int count = 1; count <= numIterations; count++) { - LOG.info("{} - Starting incremental backup iteration {} of {}", Thread.currentThread().getName(), count, - numIterations); + LOG.info("{} - Starting incremental backup iteration {} of {} for {}", Thread.currentThread().getName(), count, + numIterations, table); loadData(table, rowsInIteration); // Do incremental backup @@ -309,7 +318,8 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) LOG.info("Created incremental backup with ID: {}", incrementalBackupId); backupIds.add(incrementalBackupId); - // Restore incremental backup for table, with overwrite for previous backup + // Restore table using backup taken "two backups ago" + // On the first iteration, this backup will be the full backup String previousBackupId = backupIds.get(backupIds.size() - 2); if (previousBackupId.equals(fullBackupId)) { LOG.info("Restoring {} using original full backup with ID: {}", table, previousBackupId); @@ -320,13 +330,13 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) restoreTableAndVerifyRowCount(conn, client, table, previousBackupId, (long) rowsInIteration * count); - // Restore incremental backup for table, with overwrite for last backup + // Restore table using the most recently created incremental backup LOG.info("Restoring {} using most recent incremental backup with ID: {}", table, incrementalBackupId); restoreTableAndVerifyRowCount(conn, client, table, incrementalBackupId, (long) rowsInIteration * (count + 1)); - LOG.info("{} - Finished incremental backup iteration {} of {}", - Thread.currentThread().getName(), count, numIterations); + LOG.info("{} - Finished incremental backup iteration {} of {} for {}", + Thread.currentThread().getName(), count, numIterations, table); } // Now merge all incremental and restore @@ -347,6 +357,8 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) // The full backup and all previous incremental backups should still exist verifyAllBackupTypesExist(fullBackupId, getAllIncrementalBackupIds(backupIds)); // Delete the full backup + LOG.info("Deleting full backup: {}. This will also delete any remaining incremental backups", + fullBackupId); delete(new String[] {fullBackupId}, client); // The full backup and all incremental backups should now be deleted for (String backupId : backupIds) { @@ -359,6 +371,7 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) private void deleteMostRecentIncrementalBackup(List backupIds, BackupAdmin client) throws IOException { String incrementalBackupId = backupIds.get(backupIds.size() - 1); + LOG.info("Deleting the most recently created incremental backup: {}", incrementalBackupId); assertTrue("Final incremental backup " + incrementalBackupId + " should still exist inside of " + backupRootDir, fs.exists(new Path(backupRootDir, incrementalBackupId))); @@ -373,10 +386,10 @@ private void deleteMostRecentIncrementalBackup(List backupIds, BackupAdm private void verifyAllBackupTypesExist(String fullBackupId, String[] incrementalBackups) throws IOException { - // The full backup should still exist + // The full backup should exist assertTrue("Full backup " + fullBackupId + " should still exist inside of " + backupRootDir, fs.exists(new Path(backupRootDir, fullBackupId))); - // All other incremental backups should still exist + // All incremental backups should exist for (String backupId : incrementalBackups) { assertTrue("Incremental backup " + backupId + " should still exist inside of " + backupRootDir, fs.exists(new Path(backupRootDir, backupId))); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java index 19838eed3fa2..7ecf25862d68 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java @@ -42,10 +42,10 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; /** - * An integration test to detect regressions in HBASE-7912. Create a table with many regions, load - * data, perform series backup/load operations, then restore and verify data - * @see HBASE-7912 - * @see HBASE-14123 + * An integration test to detect regressions in HBASE-28957. Create a table with many regions, load + * data, perform series backup/load operations with continuous backup enabled, then restore and + * verify data. + * @see HBASE-28957 */ @Category(IntegrationTests.class) public class IntegrationTestContinuousBackupRestore extends IntegrationTestBackupRestoreBase { From a2822ad7ac2a8baa1285b69593dbf6b981ce2ade Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Thu, 23 Oct 2025 19:01:34 -0700 Subject: [PATCH 06/19] Continue adding changes after looking at WIP PR in GitHub Change-Id: Ie345e623089979f028b13aed13e5ec93e025eff8 --- .../IntegrationTestBackupRestoreBase.java | 170 ++++++++++-------- 1 file changed, 97 insertions(+), 73 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 37eb2238f372..85aeb39ad9a7 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -28,7 +28,6 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; -import java.util.Scanner; import java.util.Set; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -173,6 +172,10 @@ protected void createTables(String tableBaseName) throws Exception { } } + /** + * Creates a directory specified by backupWALDir and sets this directory to + * CONF_CONTINUOUS_BACKUP_WAL_DIR in the configuration. + */ protected void createAndSetBackupWalDir() throws IOException { Path root = util.getDataTestDirOnTestFS(); @@ -223,51 +226,8 @@ protected void runTestMulti(boolean isContinuousBackupEnabled) { LOG.info("IT backup & restore finished"); } - private void createTable(TableName tableName) throws Exception { - long startTime, endTime; - - TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); - - TableDescriptor desc = builder.build(); - ColumnFamilyDescriptorBuilder cbuilder = - ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset())); - ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] { cbuilder.build() }; - LOG.info("Creating table {} with {} splits.", tableName, - regionsCountPerServer * regionServerCount); - startTime = EnvironmentEdgeManager.currentTime(); - createPreSplitLoadTestTable(util.getConfiguration(), desc, columns, regionsCountPerServer); - util.waitTableAvailable(tableName); - endTime = EnvironmentEdgeManager.currentTime(); - LOG.info("Pre-split table created successfully in {}ms.", (endTime - startTime)); - } - - private void loadData(TableName table, int numRows) throws IOException { - Connection conn = util.getConnection(); - // #0- insert some data to a table - Table t1 = conn.getTable(table); - util.loadRandomRows(t1, new byte[] { 'f' }, 100, numRows); - // flush table - conn.getAdmin().flush(TableName.valueOf(table.getName())); - } - - private String backup(BackupRequest request, BackupAdmin client) throws IOException { - return client.backupTables(request); - } - - private void restore(RestoreRequest request, BackupAdmin client) throws IOException { - client.restore(request); - } - - private void merge(String[] backupIds, BackupAdmin client) throws IOException { - client.mergeBackups(backupIds); - } - - private void delete(String[] backupIds, BackupAdmin client) throws IOException { - client.deleteBackups(backupIds); - } - private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) - throws IOException, InterruptedException { + throws IOException, InterruptedException { String enabledOrDisabled = isContinuousBackupEnabled ? "enabled" : "disabled"; List backupIds = new ArrayList<>(); @@ -368,41 +328,55 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) } } - private void deleteMostRecentIncrementalBackup(List backupIds, BackupAdmin client) - throws IOException { - String incrementalBackupId = backupIds.get(backupIds.size() - 1); - LOG.info("Deleting the most recently created incremental backup: {}", incrementalBackupId); - assertTrue("Final incremental backup " + incrementalBackupId - + " should still exist inside of " + backupRootDir, - fs.exists(new Path(backupRootDir, incrementalBackupId))); + private void createTable(TableName tableName) throws Exception { + long startTime, endTime; - delete(new String[] {incrementalBackupId}, client); - backupIds.remove(backupIds.size() - 1); + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); - assertFalse("Final incremental backup " + incrementalBackupId - + " should no longer exist inside of " + backupRootDir, - fs.exists(new Path(backupRootDir, incrementalBackupId))); + TableDescriptor desc = builder.build(); + ColumnFamilyDescriptorBuilder cbuilder = + ColumnFamilyDescriptorBuilder.newBuilder(COLUMN_NAME.getBytes(Charset.defaultCharset())); + ColumnFamilyDescriptor[] columns = new ColumnFamilyDescriptor[] { cbuilder.build() }; + LOG.info("Creating table {} with {} splits.", tableName, + regionsCountPerServer * regionServerCount); + startTime = EnvironmentEdgeManager.currentTime(); + createPreSplitLoadTestTable(util.getConfiguration(), desc, columns, regionsCountPerServer); + util.waitTableAvailable(tableName); + endTime = EnvironmentEdgeManager.currentTime(); + LOG.info("Pre-split table created successfully in {}ms.", (endTime - startTime)); } - private void verifyAllBackupTypesExist(String fullBackupId, String[] incrementalBackups) - throws IOException { - // The full backup should exist - assertTrue("Full backup " + fullBackupId + " should still exist inside of " + backupRootDir, - fs.exists(new Path(backupRootDir, fullBackupId))); - // All incremental backups should exist - for (String backupId : incrementalBackups) { - assertTrue("Incremental backup " + backupId + " should still exist inside of " + backupRootDir, - fs.exists(new Path(backupRootDir, backupId))); - } + private void loadData(TableName table, int numRows) throws IOException { + Connection conn = util.getConnection(); + // #0- insert some data to a table + Table t1 = conn.getTable(table); + util.loadRandomRows(t1, new byte[] { 'f' }, 100, numRows); + // flush table + conn.getAdmin().flush(TableName.valueOf(table.getName())); + } + + private String backup(BackupRequest request, BackupAdmin client) throws IOException { + return client.backupTables(request); + } + + private void restore(RestoreRequest request, BackupAdmin client) throws IOException { + client.restore(request); + } + + private void merge(String[] backupIds, BackupAdmin client) throws IOException { + client.mergeBackups(backupIds); } - private void runInputScanner() { - Scanner scanner = new Scanner(System.in); - System.out.println("kevin: Waiting for the user to input a line"); - String line = scanner.nextLine(); - System.out.printf("kevin: Got a line: '%s'. Continuing%n", line); + private void delete(String[] backupIds, BackupAdmin client) throws IOException { + client.deleteBackups(backupIds); } + /** + * Verifies a snapshot's "data.manifest" file exists after a full backup has been performed for a + * table. The "data.manifest" file's path will look like the following: + * .../backupRootDir/backup_1760572298945/default//.hbase-snapshot/ + * snapshot_1760572306407_default_/data.manifest + */ private void verifySnapshotExists(TableName tableName, String backupId) throws IOException { RemoteIterator fileStatusIterator = fs.listFiles(new Path(backupRootDir, backupId), true); Path dataManifestPath = null; @@ -417,10 +391,11 @@ private void verifySnapshotExists(TableName tableName, String backupId) throws I } if (dataManifestPath == null) { - fail("Could not find snapshot manifest for table '" + tableName + "'"); + fail("Could not find snapshot data manifest for table '" + tableName + "'"); } } + /** Verifies the .../backupWALDir/WALs directory exists and returns its Path */ private Path verifyWALsDirectoryExists() throws IOException { String backupWALDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); Path backupWALs = new Path(backupWALDir, "WALs"); @@ -463,6 +438,13 @@ private Path verifyWALPartitionDirExists(Path backupWALs) throws IOException, return walPartitionDir; } + /** + * Verifies the WAL partition directory contains a backup WAL file + * The WAL file's path will look something like the following: + * .../backupWALDir/WALs/2025-10-17/wal_file.1760738249595.1880be89-0b69-4bad-8d0e-acbf25c63b7e + * @param walPartitionDir The date directory for a backip WAL + * i.e. .../backupWALDir/WALs/2025-10-17 + */ private void verifyBackupWALFiles(Path walPartitionDir) throws IOException { FileStatus[] fileStatuses = fs.listStatus(walPartitionDir); for (FileStatus fileStatus : fileStatuses) { @@ -476,6 +458,9 @@ private void verifyBackupWALFiles(Path walPartitionDir) throws IOException { } } + /** + * Restores a table using the provided backup ID and ensure the table has the correct row count after + */ private void restoreTableAndVerifyRowCount(Connection conn, BackupAdmin client, TableName table, String backupId, long expectedRows) throws IOException { TableName[] tablesRestoreIncMultiple = new TableName[] { table }; @@ -487,6 +472,10 @@ private void restoreTableAndVerifyRowCount(Connection conn, BackupAdmin client, hTable.close(); } + /** + * Uses the list of all backup IDs to return a sublist of incremental backup IDs. This method + * assumes the first backup in the list is a full backup, followed by incremental backups. + */ private String[] getAllIncrementalBackupIds(List backupIds) { int size = backupIds.size(); backupIds = backupIds.subList(1, size); @@ -526,6 +515,41 @@ public RestoreRequest createRestoreRequest(String backupId, boolean check, .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); } + /** + * Performs the delete command for the most recently taken incremental backup, and also removes + * this backup from the list of backup IDs. + */ + private void deleteMostRecentIncrementalBackup(List backupIds, BackupAdmin client) + throws IOException { + String incrementalBackupId = backupIds.get(backupIds.size() - 1); + LOG.info("Deleting the most recently created incremental backup: {}", incrementalBackupId); + assertTrue("Final incremental backup " + incrementalBackupId + + " should still exist inside of " + backupRootDir, + fs.exists(new Path(backupRootDir, incrementalBackupId))); + + delete(new String[] {incrementalBackupId}, client); + backupIds.remove(backupIds.size() - 1); + + assertFalse("Final incremental backup " + incrementalBackupId + + " should no longer exist inside of " + backupRootDir, + fs.exists(new Path(backupRootDir, incrementalBackupId))); + } + + /** + * Verifies all backups in the list of backup IDs actually exist on the filesystem. + */ + private void verifyAllBackupTypesExist(String fullBackupId, String[] incrementalBackups) + throws IOException { + // The full backup should exist + assertTrue("Full backup " + fullBackupId + " should still exist inside of " + backupRootDir, + fs.exists(new Path(backupRootDir, fullBackupId))); + // All incremental backups should exist + for (String backupId : incrementalBackups) { + assertTrue("Incremental backup " + backupId + " should still exist inside of " + backupRootDir, + fs.exists(new Path(backupRootDir, backupId))); + } + } + @Override public void setUpCluster() throws Exception { util = getTestingUtil(getConf()); From 8bac8e6c1e650ce5d11442a19c1f0f173ffaa443 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Thu, 23 Oct 2025 19:08:29 -0700 Subject: [PATCH 07/19] Run mvn spotless:apply Change-Id: I98eb019dd93dfc8e21b6c730e0e2e60314102724 --- .../hadoop/hbase/backup/BackupTestUtil.java | 3 +- .../hbase/backup/TestContinuousBackup.java | 3 - .../IntegrationTestBackupRestoreBase.java | 124 +++++++++--------- 3 files changed, 63 insertions(+), 67 deletions(-) diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java index 7b93d3fcc4af..4d9577431d3a 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/BackupTestUtil.java @@ -58,7 +58,8 @@ public static void enableBackup(Configuration conf) { BackupManager.decorateRegionServerConfiguration(conf); } - public static void verifyReplicationPeerSubscription(HBaseTestingUtil util, TableName tableName) throws IOException { + public static void verifyReplicationPeerSubscription(HBaseTestingUtil util, TableName tableName) + throws IOException { try (Admin admin = util.getAdmin()) { ReplicationPeerDescription peerDesc = admin.listReplicationPeers().stream() .filter(peer -> peer.getPeerId().equals(CONTINUOUS_BACKUP_REPLICATION_PEER)).findFirst() diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java index e76bfa0c090e..874ae88320e1 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestContinuousBackup.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.backup; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; -import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONTINUOUS_BACKUP_REPLICATION_PEER; import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.OPTION_ENABLE_CONTINUOUS_BACKUP; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -36,8 +35,6 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupManifest; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.util.ToolRunner; import org.junit.After; diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 85aeb39ad9a7..5e38a1acae52 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -61,12 +61,12 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; /** - * An abstract base class that is used to run backup, restore, and delete integration tests. This class - * performs both full backups and incremental backups. Both continuous backup and non-continuous - * backup test cases are supported. The number of incremental backups performed depends on the number - * of iterations defined by the user. The class performs the backup/restore in a separate thread, where - * one thread is created per table. The number of tables is user-defined, along with other various - * configurations. + * An abstract base class that is used to run backup, restore, and delete integration tests. This + * class performs both full backups and incremental backups. Both continuous backup and + * non-continuous backup test cases are supported. The number of incremental backups performed + * depends on the number of iterations defined by the user. The class performs the backup/restore in + * a separate thread, where one thread is created per table. The number of tables is user-defined, + * along with other various configurations. */ public abstract class IntegrationTestBackupRestoreBase extends IntegrationTestBase { protected static final Logger LOG = @@ -176,8 +176,7 @@ protected void createTables(String tableBaseName) throws Exception { * Creates a directory specified by backupWALDir and sets this directory to * CONF_CONTINUOUS_BACKUP_WAL_DIR in the configuration. */ - protected void createAndSetBackupWalDir() - throws IOException { + protected void createAndSetBackupWalDir() throws IOException { Path root = util.getDataTestDirOnTestFS(); Path backupWalDir = new Path(root, "backupWALDir"); FileSystem fs = FileSystem.get(conf); @@ -262,13 +261,13 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) // Now continue with incremental backups String incrementalBackupId; for (int count = 1; count <= numIterations; count++) { - LOG.info("{} - Starting incremental backup iteration {} of {} for {}", Thread.currentThread().getName(), count, - numIterations, table); + LOG.info("{} - Starting incremental backup iteration {} of {} for {}", + Thread.currentThread().getName(), count, numIterations, table); loadData(table, rowsInIteration); // Do incremental backup - LOG.info("Creating incremental backup number {} with continuous backup {} for {}", - count, enabledOrDisabled, table); + LOG.info("Creating incremental backup number {} with continuous backup {} for {}", count, + enabledOrDisabled, table); builder = new BackupRequest.Builder(); request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables) .withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled) @@ -284,15 +283,15 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) if (previousBackupId.equals(fullBackupId)) { LOG.info("Restoring {} using original full backup with ID: {}", table, previousBackupId); } else { - LOG.info("Restoring {} using second most recent incremental backup with ID: {}", - table, previousBackupId); + LOG.info("Restoring {} using second most recent incremental backup with ID: {}", table, + previousBackupId); } restoreTableAndVerifyRowCount(conn, client, table, previousBackupId, (long) rowsInIteration * count); // Restore table using the most recently created incremental backup - LOG.info("Restoring {} using most recent incremental backup with ID: {}", - table, incrementalBackupId); + LOG.info("Restoring {} using most recent incremental backup with ID: {}", table, + incrementalBackupId); restoreTableAndVerifyRowCount(conn, client, table, incrementalBackupId, (long) rowsInIteration * (count + 1)); LOG.info("{} - Finished incremental backup iteration {} of {} for {}", @@ -306,8 +305,8 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) incrementalBackupId = incrementalBackupIds[incrementalBackupIds.length - 1]; // restore incremental backup for table, with overwrite TableName[] tablesToRestoreFrom = new TableName[] { table }; - restore(createRestoreRequest(incrementalBackupId, false, tablesToRestoreFrom, - null, true), client); + restore(createRestoreRequest(incrementalBackupId, false, tablesToRestoreFrom, null, true), + client); Table hTable = conn.getTable(table); Assert.assertEquals(rowsInIteration * (numIterations + 1), HBaseTestingUtil.countRows(hTable)); @@ -319,7 +318,7 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) // Delete the full backup LOG.info("Deleting full backup: {}. This will also delete any remaining incremental backups", fullBackupId); - delete(new String[] {fullBackupId}, client); + delete(new String[] { fullBackupId }, client); // The full backup and all incremental backups should now be deleted for (String backupId : backupIds) { assertFalse("The backup " + backupId + " should no longer exist", @@ -374,19 +373,18 @@ private void delete(String[] backupIds, BackupAdmin client) throws IOException { /** * Verifies a snapshot's "data.manifest" file exists after a full backup has been performed for a * table. The "data.manifest" file's path will look like the following: - * .../backupRootDir/backup_1760572298945/default//.hbase-snapshot/ - * snapshot_1760572306407_default_/data.manifest + * .../backupRootDir/backup_1760572298945/default//.hbase-snapshot/ + * snapshot_1760572306407_default_/data.manifest */ private void verifySnapshotExists(TableName tableName, String backupId) throws IOException { - RemoteIterator fileStatusIterator = fs.listFiles(new Path(backupRootDir, backupId), true); + RemoteIterator fileStatusIterator = + fs.listFiles(new Path(backupRootDir, backupId), true); Path dataManifestPath = null; while (fileStatusIterator.hasNext()) { LocatedFileStatus fileStatus = fileStatusIterator.next(); if (fileStatus.getPath().getName().endsWith("data.manifest")) { dataManifestPath = fileStatus.getPath(); - LOG.info( - "Found snapshot manifest for table '{}' at: {}", - tableName, dataManifestPath); + LOG.info("Found snapshot manifest for table '{}' at: {}", tableName, dataManifestPath); } } @@ -399,39 +397,40 @@ private void verifySnapshotExists(TableName tableName, String backupId) throws I private Path verifyWALsDirectoryExists() throws IOException { String backupWALDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); Path backupWALs = new Path(backupWALDir, "WALs"); - assertTrue("There should be a WALs directory inside of the backup WAL directory at: " - + backupWALDir, fs.exists(backupWALs)); + assertTrue( + "There should be a WALs directory inside of the backup WAL directory at: " + backupWALDir, + fs.exists(backupWALs)); return backupWALs; } /** * Waits for a WAL partition directory to exist inside the backup WAL directory. The directory - * should be something like: .../backupWALDir/WALs/2025-10-17. The directory's existence is - * either eventually asserted, or an assertion error is thrown if it does not exist past the wait + * should be something like: .../backupWALDir/WALs/2025-10-17. The directory's existence is either + * eventually asserted, or an assertion error is thrown if it does not exist past the wait * deadline. This verification is to be used for full backups with continuous backup enabled. - * @param backupWALs The directory that should contain the partition directory. - * i.e. .../backupWALDir/WALs + * @param backupWALs The directory that should contain the partition directory. i.e. + * .../backupWALDir/WALs * @return The Path to the WAL partition directory */ - private Path verifyWALPartitionDirExists(Path backupWALs) throws IOException, - InterruptedException { + private Path verifyWALPartitionDirExists(Path backupWALs) + throws IOException, InterruptedException { long currentTimeMs = System.currentTimeMillis(); String currentDateUTC = BackupUtils.formatToDateString(currentTimeMs); Path walPartitionDir = new Path(backupWALs, currentDateUTC); int waitTimeSec = 30; while (true) { try { - assertTrue("A backup WALs subdirectory with today's date should exist: " - + walPartitionDir, fs.exists(walPartitionDir)); + assertTrue("A backup WALs subdirectory with today's date should exist: " + walPartitionDir, + fs.exists(walPartitionDir)); // The directory exists - stop waiting break; } catch (AssertionError e) { // Reach here when the directory currently does not exist - if ((System.currentTimeMillis() - currentTimeMs) >= waitTimeSec*1000) { + if ((System.currentTimeMillis() - currentTimeMs) >= waitTimeSec * 1000) { throw new AssertionError(e); } - LOG.info("Waiting up to {} seconds for WAL partition directory to exist: {}", - waitTimeSec, walPartitionDir); + LOG.info("Waiting up to {} seconds for WAL partition directory to exist: {}", waitTimeSec, + walPartitionDir); Thread.sleep(1000); } } @@ -439,11 +438,11 @@ private Path verifyWALPartitionDirExists(Path backupWALs) throws IOException, } /** - * Verifies the WAL partition directory contains a backup WAL file - * The WAL file's path will look something like the following: - * .../backupWALDir/WALs/2025-10-17/wal_file.1760738249595.1880be89-0b69-4bad-8d0e-acbf25c63b7e - * @param walPartitionDir The date directory for a backip WAL - * i.e. .../backupWALDir/WALs/2025-10-17 + * Verifies the WAL partition directory contains a backup WAL file The WAL file's path will look + * something like the following: + * .../backupWALDir/WALs/2025-10-17/wal_file.1760738249595.1880be89-0b69-4bad-8d0e-acbf25c63b7e + * @param walPartitionDir The date directory for a backip WAL i.e. + * .../backupWALDir/WALs/2025-10-17 */ private void verifyBackupWALFiles(Path walPartitionDir) throws IOException { FileStatus[] fileStatuses = fs.listStatus(walPartitionDir); @@ -452,21 +451,20 @@ private void verifyBackupWALFiles(Path walPartitionDir) throws IOException { String[] splitName = walFileName.split("\\."); assertEquals("The WAL partition directory should only have files that start with 'wal_file'", "wal_file", splitName[0]); - assertEquals("The timestamp in the WAL file's name should match the date for the WAL partition directory", - walPartitionDir.getName(), BackupUtils.formatToDateString( - Long.parseLong(splitName[1]))); + assertEquals( + "The timestamp in the WAL file's name should match the date for the WAL partition directory", + walPartitionDir.getName(), BackupUtils.formatToDateString(Long.parseLong(splitName[1]))); } } /** - * Restores a table using the provided backup ID and ensure the table has the correct row count after + * Restores a table using the provided backup ID and ensure the table has the correct row count + * after */ private void restoreTableAndVerifyRowCount(Connection conn, BackupAdmin client, TableName table, String backupId, long expectedRows) throws IOException { TableName[] tablesRestoreIncMultiple = new TableName[] { table }; - restore( - createRestoreRequest(backupId, false, tablesRestoreIncMultiple, null, true), - client); + restore(createRestoreRequest(backupId, false, tablesRestoreIncMultiple, null, true), client); Table hTable = conn.getTable(table); Assert.assertEquals(expectedRows, HBaseTestingUtil.countRows(hTable)); hTable.close(); @@ -501,15 +499,15 @@ private BackupInfo getBackupInfo(String backupId) throws IOException { /** * Get restore request. - * @param backupId backup ID - * @param check check the backup - * @param fromTables table names to restore from - * @param toTables new table names to restore to - * @param isOverwrite overwrite the table(s) + * @param backupId backup ID + * @param check check the backup + * @param fromTables table names to restore from + * @param toTables new table names to restore to + * @param isOverwrite overwrite the table(s) * @return an instance of RestoreRequest */ - public RestoreRequest createRestoreRequest(String backupId, boolean check, - TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { + public RestoreRequest createRestoreRequest(String backupId, boolean check, TableName[] fromTables, + TableName[] toTables, boolean isOverwrite) { RestoreRequest.Builder builder = new RestoreRequest.Builder(); return builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check) .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); @@ -523,15 +521,14 @@ private void deleteMostRecentIncrementalBackup(List backupIds, BackupAdm throws IOException { String incrementalBackupId = backupIds.get(backupIds.size() - 1); LOG.info("Deleting the most recently created incremental backup: {}", incrementalBackupId); - assertTrue("Final incremental backup " + incrementalBackupId - + " should still exist inside of " + backupRootDir, - fs.exists(new Path(backupRootDir, incrementalBackupId))); + assertTrue("Final incremental backup " + incrementalBackupId + " should still exist inside of " + + backupRootDir, fs.exists(new Path(backupRootDir, incrementalBackupId))); - delete(new String[] {incrementalBackupId}, client); + delete(new String[] { incrementalBackupId }, client); backupIds.remove(backupIds.size() - 1); assertFalse("Final incremental backup " + incrementalBackupId - + " should no longer exist inside of " + backupRootDir, + + " should no longer exist inside of " + backupRootDir, fs.exists(new Path(backupRootDir, incrementalBackupId))); } @@ -545,7 +542,8 @@ private void verifyAllBackupTypesExist(String fullBackupId, String[] incremental fs.exists(new Path(backupRootDir, fullBackupId))); // All incremental backups should exist for (String backupId : incrementalBackups) { - assertTrue("Incremental backup " + backupId + " should still exist inside of " + backupRootDir, + assertTrue( + "Incremental backup " + backupId + " should still exist inside of " + backupRootDir, fs.exists(new Path(backupRootDir, backupId))); } } From c2ab491b63e75c6c90df28b8db661918306c374a Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Thu, 23 Oct 2025 19:16:55 -0700 Subject: [PATCH 08/19] Add documentation for runTestMulti and runTestSingle Change-Id: I4de6fc485aa1ff6e0d8d837e081f8dde20bb3f67 --- .../backup/IntegrationTestBackupRestoreBase.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 5e38a1acae52..070be697d767 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -192,6 +192,12 @@ private void cleanUpBackupDir() throws IOException { fs.delete(new Path(backupRootDir), true); } + /** + * This is the main driver method used by tests that extend this abstract base class. This method + * kicks off one backup and restore thread per table. + * @param isContinuousBackupEnabled Boolean flag used to specify if the backups should have + * continuous backup enabled. + */ protected void runTestMulti(boolean isContinuousBackupEnabled) { Thread[] workers = new Thread[numTables]; BackupAndRestoreThread[] backupAndRestoreThreads = new BackupAndRestoreThread[numTables]; @@ -225,6 +231,14 @@ protected void runTestMulti(boolean isContinuousBackupEnabled) { LOG.info("IT backup & restore finished"); } + /** + * This method is what performs the actual backup, restore, merge, and delete operations. This + * method is run in a separate thread. It first performs a full backup. After, it iteratively + * performs a series of incremental backups and restores. Later, it deletes the backups. + * @param table The table the backups are performed on + * @param isContinuousBackupEnabled Boolean flag used to indicate if the backups should have + * continuous backup enabled. + */ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) throws IOException, InterruptedException { String enabledOrDisabled = isContinuousBackupEnabled ? "enabled" : "disabled"; From 3024ea62663ac026a203e83931a71f0f221480f3 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Thu, 23 Oct 2025 19:20:49 -0700 Subject: [PATCH 09/19] Update documentation Change-Id: I911180a8f263f801a5c299d43d0215fe444f22d3 --- .../hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 070be697d767..521e903c9c14 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -194,7 +194,7 @@ private void cleanUpBackupDir() throws IOException { /** * This is the main driver method used by tests that extend this abstract base class. This method - * kicks off one backup and restore thread per table. + * starts one backup and restore thread per table. * @param isContinuousBackupEnabled Boolean flag used to specify if the backups should have * continuous backup enabled. */ From a7fdfc2acdc5ffa418fce79abfbc71d98bf9c503 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Fri, 24 Oct 2025 20:35:28 -0700 Subject: [PATCH 10/19] Enhance delete test case Change-Id: I78fe59f800cde7c89b11760a49d774c5173a862c --- .../IntegrationTestBackupRestoreBase.java | 104 ++++++++++++------ 1 file changed, 70 insertions(+), 34 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 521e903c9c14..5c1a0981d9c3 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -256,8 +256,7 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) .withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled) .build(); - String fullBackupId = backup(request, client); - assertTrue(checkSucceeded(fullBackupId)); + String fullBackupId = backup(request, client, backupIds); LOG.info("Created full backup with ID: {}", fullBackupId); verifySnapshotExists(table, fullBackupId); @@ -270,8 +269,6 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) verifyBackupWALFiles(walPartitionDir); } - backupIds.add(fullBackupId); - // Now continue with incremental backups String incrementalBackupId; for (int count = 1; count <= numIterations; count++) { @@ -286,10 +283,8 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables) .withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled) .build(); - incrementalBackupId = backup(request, client); - assertTrue(checkSucceeded(incrementalBackupId)); + incrementalBackupId = backup(request, client, backupIds); LOG.info("Created incremental backup with ID: {}", incrementalBackupId); - backupIds.add(incrementalBackupId); // Restore table using backup taken "two backups ago" // On the first iteration, this backup will be the full backup @@ -315,7 +310,9 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) // Now merge all incremental and restore String[] incrementalBackupIds = getAllIncrementalBackupIds(backupIds); merge(incrementalBackupIds, client); - // Restore last one + verifyExistingBackupsAfterMerge(backupIds); + removeNonexistentBackups(backupIds); + // Restore the last incremental backup incrementalBackupId = incrementalBackupIds[incrementalBackupIds.length - 1]; // restore incremental backup for table, with overwrite TableName[] tablesToRestoreFrom = new TableName[] { table }; @@ -326,18 +323,18 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) HBaseTestingUtil.countRows(hTable)); hTable.close(); + // Create another incremental backup to show it can be deleted on its own + backup(request, client, backupIds); deleteMostRecentIncrementalBackup(backupIds, client); - // The full backup and all previous incremental backups should still exist - verifyAllBackupTypesExist(fullBackupId, getAllIncrementalBackupIds(backupIds)); - // Delete the full backup + // The full backup and the second most recent incremental backup should still exist + assertEquals(2, backupIds.size()); + verifyAllBackupsExist(backupIds); + // Delete the full backup, which should also automatically delete any incremental backups that + // depend on it LOG.info("Deleting full backup: {}. This will also delete any remaining incremental backups", fullBackupId); delete(new String[] { fullBackupId }, client); - // The full backup and all incremental backups should now be deleted - for (String backupId : backupIds) { - assertFalse("The backup " + backupId + " should no longer exist", - fs.exists(new Path(backupRootDir, backupId))); - } + verifyNoBackupsExist(backupIds); } } @@ -368,8 +365,12 @@ private void loadData(TableName table, int numRows) throws IOException { conn.getAdmin().flush(TableName.valueOf(table.getName())); } - private String backup(BackupRequest request, BackupAdmin client) throws IOException { - return client.backupTables(request); + private String backup(BackupRequest request, BackupAdmin client, List backupIds) throws IOException { + String backupId = client.backupTables(request); + assertTrue(checkSucceeded(backupId)); + verifyBackupExists(backupId); + backupIds.add(backupId); + return backupId; } private void restore(RestoreRequest request, BackupAdmin client) throws IOException { @@ -527,6 +528,32 @@ public RestoreRequest createRestoreRequest(String backupId, boolean check, Table .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); } + private void verifyExistingBackupsAfterMerge(List backupIds) + throws IOException { + String fullBackupId = backupIds.get(0); + String mostRecentIncrementalBackup = backupIds.get(backupIds.size() - 1); + for (String backupId : backupIds) { + if (backupId.equals(fullBackupId) || backupId.equals(mostRecentIncrementalBackup)) { + verifyBackupExists(backupId); + } else { + verifyBackupDoesNotExist(backupId); + } + } + } + + private void removeNonexistentBackups(List backupIds) throws IOException { + List backupsToRemove = new ArrayList<>(); + for (String backupId : backupIds) { + if (!fs.exists(new Path(backupRootDir, backupId))) { + backupsToRemove.add(backupId); + } + } + for (String backupId : backupsToRemove) { + LOG.info("Removing {} from list of backup IDs since it no longer exists", backupId); + backupIds.remove(backupId); + } + } + /** * Performs the delete command for the most recently taken incremental backup, and also removes * this backup from the list of backup IDs. @@ -535,33 +562,42 @@ private void deleteMostRecentIncrementalBackup(List backupIds, BackupAdm throws IOException { String incrementalBackupId = backupIds.get(backupIds.size() - 1); LOG.info("Deleting the most recently created incremental backup: {}", incrementalBackupId); - assertTrue("Final incremental backup " + incrementalBackupId + " should still exist inside of " - + backupRootDir, fs.exists(new Path(backupRootDir, incrementalBackupId))); - + verifyBackupExists(incrementalBackupId); delete(new String[] { incrementalBackupId }, client); backupIds.remove(backupIds.size() - 1); - - assertFalse("Final incremental backup " + incrementalBackupId - + " should no longer exist inside of " + backupRootDir, - fs.exists(new Path(backupRootDir, incrementalBackupId))); + verifyBackupDoesNotExist(incrementalBackupId); } /** * Verifies all backups in the list of backup IDs actually exist on the filesystem. */ - private void verifyAllBackupTypesExist(String fullBackupId, String[] incrementalBackups) + private void verifyAllBackupsExist(List backupIds) + throws IOException { + for (String backupId : backupIds) { + verifyBackupExists(backupId); + } + } + + /** + * Verifies zero backups in the list of backup IDs exist on the filesystem. + */ + private void verifyNoBackupsExist(List backupIds) throws IOException { - // The full backup should exist - assertTrue("Full backup " + fullBackupId + " should still exist inside of " + backupRootDir, - fs.exists(new Path(backupRootDir, fullBackupId))); - // All incremental backups should exist - for (String backupId : incrementalBackups) { - assertTrue( - "Incremental backup " + backupId + " should still exist inside of " + backupRootDir, - fs.exists(new Path(backupRootDir, backupId))); + for (String backupId : backupIds) { + verifyBackupDoesNotExist(backupId); } } + private void verifyBackupExists(String backupId) throws IOException { + assertTrue("Backup " + backupId + " should exist inside of " + backupRootDir, + fs.exists(new Path(backupRootDir, backupId))); + } + + private void verifyBackupDoesNotExist(String backupId) throws IOException { + assertFalse("Backup " + backupId + " should not exist inside of " + backupRootDir, + fs.exists(new Path(backupRootDir, backupId))); + } + @Override public void setUpCluster() throws Exception { util = getTestingUtil(getConf()); From d0eb5eefa03b19ef0520533cdd16a589dbf9d083 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Mon, 27 Oct 2025 10:11:21 -0700 Subject: [PATCH 11/19] Update method name to verifyBackupExistenceAfterMerge Change-Id: Ia150d21f48bb160d9e8bcf922799dc18c0b7c77c --- .../IntegrationTestBackupRestoreBase.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 5c1a0981d9c3..02997a384855 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -310,7 +310,7 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) // Now merge all incremental and restore String[] incrementalBackupIds = getAllIncrementalBackupIds(backupIds); merge(incrementalBackupIds, client); - verifyExistingBackupsAfterMerge(backupIds); + verifyBackupExistenceAfterMerge(backupIds); removeNonexistentBackups(backupIds); // Restore the last incremental backup incrementalBackupId = incrementalBackupIds[incrementalBackupIds.length - 1]; @@ -365,7 +365,8 @@ private void loadData(TableName table, int numRows) throws IOException { conn.getAdmin().flush(TableName.valueOf(table.getName())); } - private String backup(BackupRequest request, BackupAdmin client, List backupIds) throws IOException { + private String backup(BackupRequest request, BackupAdmin client, List backupIds) + throws IOException { String backupId = client.backupTables(request); assertTrue(checkSucceeded(backupId)); verifyBackupExists(backupId); @@ -528,8 +529,12 @@ public RestoreRequest createRestoreRequest(String backupId, boolean check, Table .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); } - private void verifyExistingBackupsAfterMerge(List backupIds) - throws IOException { + /** + * Iterates through the list of backups and verifies the full backup and latest incremental backup + * still exist, while also verifying all other backups no longer exist. This method is meant to be + * run after all incremental backups have been merged. + */ + private void verifyBackupExistenceAfterMerge(List backupIds) throws IOException { String fullBackupId = backupIds.get(0); String mostRecentIncrementalBackup = backupIds.get(backupIds.size() - 1); for (String backupId : backupIds) { @@ -571,8 +576,7 @@ private void deleteMostRecentIncrementalBackup(List backupIds, BackupAdm /** * Verifies all backups in the list of backup IDs actually exist on the filesystem. */ - private void verifyAllBackupsExist(List backupIds) - throws IOException { + private void verifyAllBackupsExist(List backupIds) throws IOException { for (String backupId : backupIds) { verifyBackupExists(backupId); } @@ -581,8 +585,7 @@ private void verifyAllBackupsExist(List backupIds) /** * Verifies zero backups in the list of backup IDs exist on the filesystem. */ - private void verifyNoBackupsExist(List backupIds) - throws IOException { + private void verifyNoBackupsExist(List backupIds) throws IOException { for (String backupId : backupIds) { verifyBackupDoesNotExist(backupId); } From 39aaa6cea1cdbb47d57dba5ae556bfb3cb861a97 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Wed, 29 Oct 2025 18:10:27 -0700 Subject: [PATCH 12/19] Address review comments Change-Id: I9d5b55e36b44367ac8ace08a5859c42b796fefd4 --- .../backup/IntegrationTestBackupRestore.java | 2 +- .../IntegrationTestBackupRestoreBase.java | 19 +++++++++++-------- ...ntegrationTestContinuousBackupRestore.java | 2 +- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java index c363ee9501c1..74c51c26fd4a 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java @@ -61,7 +61,7 @@ public void setUp() throws Exception { util.initializeCluster(regionServerCount); LOG.info("Cluster initialized and ready"); - backupRootDir = util.getDataTestDirOnTestFS() + Path.SEPARATOR + backupRootDir; + backupRootDir = util.getDataTestDirOnTestFS() + Path.SEPARATOR + DEFAULT_BACKUP_ROOT_DIR; LOG.info("The backup root directory is: {}", backupRootDir); fs = FileSystem.get(conf); } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 02997a384855..0f125bbbe800 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -85,6 +85,7 @@ public abstract class IntegrationTestBackupRestoreBase extends IntegrationTestBa protected static final String SLEEP_TIME_KEY = "sleeptime"; // short default interval because tests don't run very long. protected static final long SLEEP_TIME_DEFAULT = 50000L; + protected static String DEFAULT_BACKUP_ROOT_DIR = "backupIT"; protected static int rowsInIteration; protected static int regionsCountPerServer; @@ -97,7 +98,7 @@ public abstract class IntegrationTestBackupRestoreBase extends IntegrationTestBa protected static Object lock = new Object(); protected FileSystem fs; - protected String backupRootDir = "backupRootDir"; + protected String backupRootDir; /* * This class is used to run the backup and restore thread(s). Throwing an exception in this @@ -316,8 +317,8 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) incrementalBackupId = incrementalBackupIds[incrementalBackupIds.length - 1]; // restore incremental backup for table, with overwrite TableName[] tablesToRestoreFrom = new TableName[] { table }; - restore(createRestoreRequest(incrementalBackupId, false, tablesToRestoreFrom, null, true), - client); + restore(createRestoreRequest(backupRootDir, incrementalBackupId, false, tablesToRestoreFrom, + null, true), client); Table hTable = conn.getTable(table); Assert.assertEquals(rowsInIteration * (numIterations + 1), HBaseTestingUtil.countRows(hTable)); @@ -454,10 +455,10 @@ private Path verifyWALPartitionDirExists(Path backupWALs) } /** - * Verifies the WAL partition directory contains a backup WAL file The WAL file's path will look + * Verifies the WAL partition directory contains a backup WAL file. The WAL file's path will look * something like the following: * .../backupWALDir/WALs/2025-10-17/wal_file.1760738249595.1880be89-0b69-4bad-8d0e-acbf25c63b7e - * @param walPartitionDir The date directory for a backip WAL i.e. + * @param walPartitionDir The date directory for a backup WAL i.e. * .../backupWALDir/WALs/2025-10-17 */ private void verifyBackupWALFiles(Path walPartitionDir) throws IOException { @@ -480,7 +481,9 @@ private void verifyBackupWALFiles(Path walPartitionDir) throws IOException { private void restoreTableAndVerifyRowCount(Connection conn, BackupAdmin client, TableName table, String backupId, long expectedRows) throws IOException { TableName[] tablesRestoreIncMultiple = new TableName[] { table }; - restore(createRestoreRequest(backupId, false, tablesRestoreIncMultiple, null, true), client); + restore( + createRestoreRequest(backupRootDir, backupId, false, tablesRestoreIncMultiple, null, true), + client); Table hTable = conn.getTable(table); Assert.assertEquals(expectedRows, HBaseTestingUtil.countRows(hTable)); hTable.close(); @@ -522,8 +525,8 @@ private BackupInfo getBackupInfo(String backupId) throws IOException { * @param isOverwrite overwrite the table(s) * @return an instance of RestoreRequest */ - public RestoreRequest createRestoreRequest(String backupId, boolean check, TableName[] fromTables, - TableName[] toTables, boolean isOverwrite) { + public RestoreRequest createRestoreRequest(String backupRootDir, String backupId, boolean check, + TableName[] fromTables, TableName[] toTables, boolean isOverwrite) { RestoreRequest.Builder builder = new RestoreRequest.Builder(); return builder.withBackupRootDir(backupRootDir).withBackupId(backupId).withCheck(check) .withFromTables(fromTables).withToTables(toTables).withOvewrite(isOverwrite).build(); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java index 7ecf25862d68..0f667cb2172d 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java @@ -80,7 +80,7 @@ public void setUp() throws Exception { util.initializeCluster(regionServerCount); LOG.info("Cluster initialized and ready"); - backupRootDir = util.getDataTestDirOnTestFS() + Path.SEPARATOR + backupRootDir; + backupRootDir = util.getDataTestDirOnTestFS() + Path.SEPARATOR + DEFAULT_BACKUP_ROOT_DIR; LOG.info("The backup root directory is: {}", backupRootDir); createAndSetBackupWalDir(); fs = FileSystem.get(conf); From 526b3ec4c54c3b8b3eeb635c80eacfb8438024e8 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Fri, 31 Oct 2025 12:20:07 -0700 Subject: [PATCH 13/19] Add wait for region servers in replication checkpoint to catch up with latest Put timestamp Change-Id: Ic438ca292bc01827d46725e006bfa0c21bc95f01 --- .../IntegrationTestBackupRestoreBase.java | 128 +++++++++++++++--- 1 file changed, 108 insertions(+), 20 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 0f125bbbe800..5cc2619e6a23 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -28,14 +28,17 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.IntegrationTestBase; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; @@ -47,6 +50,9 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; @@ -236,22 +242,23 @@ protected void runTestMulti(boolean isContinuousBackupEnabled) { * This method is what performs the actual backup, restore, merge, and delete operations. This * method is run in a separate thread. It first performs a full backup. After, it iteratively * performs a series of incremental backups and restores. Later, it deletes the backups. - * @param table The table the backups are performed on + * @param tableName The table the backups are performed on * @param isContinuousBackupEnabled Boolean flag used to indicate if the backups should have * continuous backup enabled. */ - private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) + private void runTestSingle(TableName tableName, boolean isContinuousBackupEnabled) throws IOException, InterruptedException { String enabledOrDisabled = isContinuousBackupEnabled ? "enabled" : "disabled"; List backupIds = new ArrayList<>(); - try (Connection conn = util.getConnection(); BackupAdmin client = new BackupAdminImpl(conn)) { - loadData(table, rowsInIteration); + try (Connection conn = util.getConnection(); BackupAdmin client = new BackupAdminImpl(conn); + Table tableConn = conn.getTable(tableName)) { + loadData(tableName, rowsInIteration); // First create a full backup for the table - LOG.info("Creating full backup image for {} with continuous backup {}", table, + LOG.info("Creating full backup image for {} with continuous backup {}", tableName, enabledOrDisabled); - List tables = Lists.newArrayList(table); + List tables = Lists.newArrayList(tableName); BackupRequest.Builder builder = new BackupRequest.Builder(); BackupRequest request = builder.withBackupType(BackupType.FULL).withTableList(tables) .withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled) @@ -260,11 +267,11 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) String fullBackupId = backup(request, client, backupIds); LOG.info("Created full backup with ID: {}", fullBackupId); - verifySnapshotExists(table, fullBackupId); + verifySnapshotExists(tableName, fullBackupId); // Run full backup verifications specific to continuous backup if (isContinuousBackupEnabled) { - BackupTestUtil.verifyReplicationPeerSubscription(util, table); + BackupTestUtil.verifyReplicationPeerSubscription(util, tableName); Path backupWALs = verifyWALsDirectoryExists(); Path walPartitionDir = verifyWALPartitionDirExists(backupWALs); verifyBackupWALFiles(walPartitionDir); @@ -274,12 +281,16 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) String incrementalBackupId; for (int count = 1; count <= numIterations; count++) { LOG.info("{} - Starting incremental backup iteration {} of {} for {}", - Thread.currentThread().getName(), count, numIterations, table); - loadData(table, rowsInIteration); + Thread.currentThread().getName(), count, numIterations, tableName); + loadData(tableName, rowsInIteration); + if (isContinuousBackupEnabled) { + long latestPutTimestamp = getLatestPutTimestamp(tableConn); + waitForCheckpointTimestampUpdate(conn, latestPutTimestamp, tableName); + } // Do incremental backup LOG.info("Creating incremental backup number {} with continuous backup {} for {}", count, - enabledOrDisabled, table); + enabledOrDisabled, tableName); builder = new BackupRequest.Builder(); request = builder.withBackupType(BackupType.INCREMENTAL).withTableList(tables) .withTargetRootDir(backupRootDir).withContinuousBackupEnabled(isContinuousBackupEnabled) @@ -291,21 +302,22 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) // On the first iteration, this backup will be the full backup String previousBackupId = backupIds.get(backupIds.size() - 2); if (previousBackupId.equals(fullBackupId)) { - LOG.info("Restoring {} using original full backup with ID: {}", table, previousBackupId); - } else { - LOG.info("Restoring {} using second most recent incremental backup with ID: {}", table, + LOG.info("Restoring {} using original full backup with ID: {}", tableName, previousBackupId); + } else { + LOG.info("Restoring {} using second most recent incremental backup with ID: {}", + tableName, previousBackupId); } - restoreTableAndVerifyRowCount(conn, client, table, previousBackupId, + restoreTableAndVerifyRowCount(conn, client, tableName, previousBackupId, (long) rowsInIteration * count); // Restore table using the most recently created incremental backup - LOG.info("Restoring {} using most recent incremental backup with ID: {}", table, + LOG.info("Restoring {} using most recent incremental backup with ID: {}", tableName, incrementalBackupId); - restoreTableAndVerifyRowCount(conn, client, table, incrementalBackupId, + restoreTableAndVerifyRowCount(conn, client, tableName, incrementalBackupId, (long) rowsInIteration * (count + 1)); LOG.info("{} - Finished incremental backup iteration {} of {} for {}", - Thread.currentThread().getName(), count, numIterations, table); + Thread.currentThread().getName(), count, numIterations, tableName); } // Now merge all incremental and restore @@ -316,10 +328,10 @@ private void runTestSingle(TableName table, boolean isContinuousBackupEnabled) // Restore the last incremental backup incrementalBackupId = incrementalBackupIds[incrementalBackupIds.length - 1]; // restore incremental backup for table, with overwrite - TableName[] tablesToRestoreFrom = new TableName[] { table }; + TableName[] tablesToRestoreFrom = new TableName[] { tableName }; restore(createRestoreRequest(backupRootDir, incrementalBackupId, false, tablesToRestoreFrom, null, true), client); - Table hTable = conn.getTable(table); + Table hTable = conn.getTable(tableName); Assert.assertEquals(rowsInIteration * (numIterations + 1), HBaseTestingUtil.countRows(hTable)); hTable.close(); @@ -474,6 +486,82 @@ private void verifyBackupWALFiles(Path walPartitionDir) throws IOException { } } + /** + * Checks if all timestamps in a map with ServerName and timestamp key-values pairs are past the + * provided timestamp threshold. + * @param timestamps Map with ServerName and timestamp key-value pairs + * @param threshold Timestamp to check all timestamp values in the map against + * @return True if all timestamps values in the map have met or are path the timestamp threshold; + * False otherwise + */ + private boolean areAllTimestampsPastThreshold(Map timestamps, long threshold, + TableName tableName) { + boolean haveAllTimestampsReachedThreshold = true; + LOG.info( + "Checking if each region server in the replication check point has caught up to the latest Put on {}", + tableName.getNameAsString()); + for (Map.Entry entry : timestamps.entrySet()) { + LOG.info("host={}, checkpoint timestamp={}, latest put timestamp={}, caught up={}", + entry.getKey(), entry.getValue(), threshold, entry.getValue() >= threshold); + if (entry.getValue() < threshold) { + // Not returning right away so all hosts and timestamps are logged + haveAllTimestampsReachedThreshold = false; + } + } + return haveAllTimestampsReachedThreshold; + } + + /** + * Waits for the replication checkpoint timestamp of each region server to meet or pass the + * timestamp of the latest Put operation on the backed-up table. + * @param conn Minicluster connection + * @param latestPutTimestamp Timestamp of the latest Put operation on the backed-up table + */ + private void waitForCheckpointTimestampUpdate(Connection conn, long latestPutTimestamp, + TableName tableName) throws IOException, InterruptedException { + BackupSystemTable backupSystemTable = new BackupSystemTable(conn); + Map checkpointTimestamps = backupSystemTable.getBackupCheckpointTimestamps(); + int i = 0; + int sleepTimeSec = 10; + int waitThresholdMs = 15 * 60 * 1000; + long waitStartTime = System.currentTimeMillis(); + while (!areAllTimestampsPastThreshold(checkpointTimestamps, latestPutTimestamp, tableName)) { + if ((System.currentTimeMillis() - waitStartTime) >= waitThresholdMs) { + throw new RuntimeException("Timed out waiting for the replication checkpoint timestamp of " + + "each region server to catch up with timestamp of the latest Put on " + + tableName.getNameAsString()); + } + LOG.info( + "Waiting {} seconds for the replication checkpoint timestamp for each region server " + + "to catch up with the timestamp of the latest Put on {}", + sleepTimeSec, tableName.getNameAsString()); + Thread.sleep(sleepTimeSec * 1000); + checkpointTimestamps = backupSystemTable.getBackupCheckpointTimestamps(); + i++; + } + LOG.info("Done waiting. Total wait time: {} seconds", i * sleepTimeSec); + } + + /** + * Scans the backed-up table and returns the timestamp (ms) of the latest Put operation on the + * table. + * @param table The backed-up table to scan + * @return Timestamp of the most recent Put on the backed-up table + */ + private long getLatestPutTimestamp(Table table) throws IOException { + Scan scan = new Scan(); + ResultScanner resultScanner = table.getScanner(scan); + long timestamp = 0; + for (Result result : resultScanner) { + for (Cell cell : result.rawCells()) { + if (cell.getTimestamp() > timestamp) { + timestamp = cell.getTimestamp(); + } + } + } + return timestamp; + } + /** * Restores a table using the provided backup ID and ensure the table has the correct row count * after From 77e22c9bbd5ffd7b1ba02e0c828538920614aa28 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Fri, 31 Oct 2025 16:51:53 -0700 Subject: [PATCH 14/19] Handle command line arg parsing and conf setup in base class Change-Id: I9d52e774e84dc389d42aa63315529a2590c40cb8 --- .../backup/IntegrationTestBackupRestore.java | 31 +------------------ .../IntegrationTestBackupRestoreBase.java | 24 ++++++++++++-- ...ntegrationTestContinuousBackupRestore.java | 25 +-------------- 3 files changed, 24 insertions(+), 56 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java index 74c51c26fd4a..6b9935e7b972 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java @@ -31,9 +31,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; -import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; - /** * An integration test to detect regressions in HBASE-7912. Create a table with many regions, load * data, perform series backup/load operations, then restore and verify data @@ -48,14 +45,7 @@ public class IntegrationTestBackupRestore extends IntegrationTestBackupRestoreBa @Override @Before public void setUp() throws Exception { - util = new IntegrationTestingUtility(); - conf = util.getConfiguration(); - regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT); - regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); - rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION); - numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS); - numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES); - sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); + initializeConfFromCommandLine(); BackupTestUtil.enableBackup(conf); LOG.info("Initializing cluster with {} region servers.", regionServerCount); util.initializeCluster(regionServerCount); @@ -86,25 +76,6 @@ public int runTestFromCommandLine() throws Exception { return 0; } - @Override - protected void addOptions() { - super.addOptions(); - addOptWithArg(REGIONSERVER_COUNT_KEY, - "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'"); - } - - @Override - protected void processOptions(CommandLine cmd) { - super.processOptions(cmd); - regionServerCount = Integer.parseInt( - cmd.getOptionValue(REGIONSERVER_COUNT_KEY, Integer.toString(DEFAULT_REGIONSERVER_COUNT))); - - LOG.info(MoreObjects.toStringHelper("Parsed Options") - .add(REGION_COUNT_KEY, regionsCountPerServer).add(REGIONSERVER_COUNT_KEY, regionServerCount) - .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY, numIterations) - .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString()); - } - public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); IntegrationTestingUtility.setUseDistributedCluster(conf); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 5cc2619e6a23..b5a14c8781b3 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.IntegrationTestBase; +import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl; @@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; import org.junit.After; import org.junit.Assert; import org.slf4j.Logger; @@ -142,6 +144,17 @@ public void run() { } } + protected void initializeConfFromCommandLine() { + util = new IntegrationTestingUtility(); + conf = util.getConfiguration(); + regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT); + regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); + rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION); + numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS); + numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES); + sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); + } + @After public void tearDown() throws IOException { LOG.info("Cleaning up after test."); @@ -285,7 +298,7 @@ private void runTestSingle(TableName tableName, boolean isContinuousBackupEnable loadData(tableName, rowsInIteration); if (isContinuousBackupEnabled) { long latestPutTimestamp = getLatestPutTimestamp(tableConn); - waitForCheckpointTimestampUpdate(conn, latestPutTimestamp, tableName); + waitForCheckpointTimestampsToUpdate(conn, latestPutTimestamp, tableName); } // Do incremental backup @@ -517,7 +530,7 @@ private boolean areAllTimestampsPastThreshold(Map timestamps, * @param conn Minicluster connection * @param latestPutTimestamp Timestamp of the latest Put operation on the backed-up table */ - private void waitForCheckpointTimestampUpdate(Connection conn, long latestPutTimestamp, + private void waitForCheckpointTimestampsToUpdate(Connection conn, long latestPutTimestamp, TableName tableName) throws IOException, InterruptedException { BackupSystemTable backupSystemTable = new BackupSystemTable(conn); Map checkpointTimestamps = backupSystemTable.getBackupCheckpointTimestamps(); @@ -716,6 +729,8 @@ protected Set getColumnFamilies() { @Override protected void addOptions() { + addOptWithArg(REGIONSERVER_COUNT_KEY, + "Total number of region servers. Default: '" + DEFAULT_REGIONSERVER_COUNT + "'"); addOptWithArg(REGION_COUNT_KEY, "Total number of regions. Default: " + DEFAULT_REGION_COUNT); addOptWithArg(ROWS_PER_ITERATION_KEY, "Total number of data rows to be loaded during one iteration." + " Default: " @@ -743,6 +758,11 @@ protected void processOptions(CommandLine cmd) { cmd.getOptionValue(NUMBER_OF_TABLES_KEY, Integer.toString(DEFAULT_NUMBER_OF_TABLES))); sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, Long.toString(SLEEP_TIME_DEFAULT))); + + LOG.info(MoreObjects.toStringHelper("Parsed Options") + .add(REGION_COUNT_KEY, regionsCountPerServer).add(REGIONSERVER_COUNT_KEY, regionServerCount) + .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY, numIterations) + .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString()); } } diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java index 0f667cb2172d..fdcb5e7ebe06 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java @@ -38,9 +38,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; -import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; - /** * An integration test to detect regressions in HBASE-28957. Create a table with many regions, load * data, perform series backup/load operations with continuous backup enabled, then restore and @@ -57,17 +54,7 @@ public class IntegrationTestContinuousBackupRestore extends IntegrationTestBacku @Override @Before public void setUp() throws Exception { - util = new IntegrationTestingUtility(); - conf = util.getConfiguration(); - regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT); - // We are using only 1 region server because we cannot wait for all region servers to catch up - // with replication. Therefore, we cannot be sure about how many rows will be restored after an - // incremental backup. - regionServerCount = 1; - rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION); - numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS); - numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES); - sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); + initializeConfFromCommandLine(); BackupTestUtil.enableBackup(conf); conf.set(CONF_BACKUP_MAX_WAL_SIZE, "10240"); conf.set(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); @@ -106,16 +93,6 @@ public int runTestFromCommandLine() throws Exception { return 0; } - @Override - protected void processOptions(CommandLine cmd) { - super.processOptions(cmd); - - LOG.info( - MoreObjects.toStringHelper("Parsed Options").add(REGION_COUNT_KEY, regionsCountPerServer) - .add(ROWS_PER_ITERATION_KEY, rowsInIteration).add(NUM_ITERATIONS_KEY, numIterations) - .add(NUMBER_OF_TABLES_KEY, numTables).add(SLEEP_TIME_KEY, sleepTime).toString()); - } - public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); IntegrationTestingUtility.setUseDistributedCluster(conf); From e0fccd13048c333e0c29f98eb8dff40ed3a31bf5 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Mon, 3 Nov 2025 13:34:57 -0800 Subject: [PATCH 15/19] Fix spotless error Change-Id: I27eec25091842376ee7a059a9688c6f5ab385ac7 --- .../hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index b5a14c8781b3..c70ac50dff40 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -58,12 +58,12 @@ import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; import org.junit.After; import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.MoreObjects; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine; From cf91387a626afda289a9ac03dad70feda20ac34e Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Mon, 3 Nov 2025 15:29:18 -0800 Subject: [PATCH 16/19] Fix checkstyle errors for IntegrationTestBackupRestore.java Change-Id: I18ab629df4af4e93b42ec1b0d576fd411279c775 --- .../backup/IntegrationTestBackupRestoreBase.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index c70ac50dff40..cdef714c6240 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -415,8 +415,8 @@ private void delete(String[] backupIds, BackupAdmin client) throws IOException { /** * Verifies a snapshot's "data.manifest" file exists after a full backup has been performed for a * table. The "data.manifest" file's path will look like the following: - * .../backupRootDir/backup_1760572298945/default//.hbase-snapshot/ - * snapshot_1760572306407_default_/data.manifest + * .../backupRootDir/backup_1760572298945/default/TABLE_NAME/.hbase-snapshot/ + * snapshot_1760572306407_default_TABLE_NAME/data.manifest */ private void verifySnapshotExists(TableName tableName, String backupId) throws IOException { RemoteIterator fileStatusIterator = @@ -494,7 +494,8 @@ private void verifyBackupWALFiles(Path walPartitionDir) throws IOException { assertEquals("The WAL partition directory should only have files that start with 'wal_file'", "wal_file", splitName[0]); assertEquals( - "The timestamp in the WAL file's name should match the date for the WAL partition directory", + "The timestamp in the WAL file's name should match the " + + "date for the WAL partition directory", walPartitionDir.getName(), BackupUtils.formatToDateString(Long.parseLong(splitName[1]))); } } @@ -510,9 +511,8 @@ private void verifyBackupWALFiles(Path walPartitionDir) throws IOException { private boolean areAllTimestampsPastThreshold(Map timestamps, long threshold, TableName tableName) { boolean haveAllTimestampsReachedThreshold = true; - LOG.info( - "Checking if each region server in the replication check point has caught up to the latest Put on {}", - tableName.getNameAsString()); + LOG.info("Checking if each region server in the replication check point " + + "has caught up to the latest Put on {}", tableName.getNameAsString()); for (Map.Entry entry : timestamps.entrySet()) { LOG.info("host={}, checkpoint timestamp={}, latest put timestamp={}, caught up={}", entry.getKey(), entry.getValue(), threshold, entry.getValue() >= threshold); From b53ae0651d1c6b780f2469109d80799f4b6efd98 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Tue, 4 Nov 2025 12:17:25 -0800 Subject: [PATCH 17/19] Remove initializeConfFromCommandLine() Change-Id: Ibc96fd712e384cc3ca5a2c4575e47e65e62c60fa --- .../hbase/backup/IntegrationTestBackupRestore.java | 3 ++- .../backup/IntegrationTestBackupRestoreBase.java | 11 ----------- .../IntegrationTestContinuousBackupRestore.java | 3 ++- 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java index 6b9935e7b972..99fbcb21a2b5 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestore.java @@ -45,7 +45,8 @@ public class IntegrationTestBackupRestore extends IntegrationTestBackupRestoreBa @Override @Before public void setUp() throws Exception { - initializeConfFromCommandLine(); + util = new IntegrationTestingUtility(); + conf = util.getConfiguration(); BackupTestUtil.enableBackup(conf); LOG.info("Initializing cluster with {} region servers.", regionServerCount); util.initializeCluster(regionServerCount); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index cdef714c6240..391eccdade66 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -144,17 +144,6 @@ public void run() { } } - protected void initializeConfFromCommandLine() { - util = new IntegrationTestingUtility(); - conf = util.getConfiguration(); - regionsCountPerServer = conf.getInt(REGION_COUNT_KEY, DEFAULT_REGION_COUNT); - regionServerCount = conf.getInt(REGIONSERVER_COUNT_KEY, DEFAULT_REGIONSERVER_COUNT); - rowsInIteration = conf.getInt(ROWS_PER_ITERATION_KEY, DEFAULT_ROWS_IN_ITERATION); - numIterations = conf.getInt(NUM_ITERATIONS_KEY, DEFAULT_NUM_ITERATIONS); - numTables = conf.getInt(NUMBER_OF_TABLES_KEY, DEFAULT_NUMBER_OF_TABLES); - sleepTime = conf.getLong(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT); - } - @After public void tearDown() throws IOException { LOG.info("Cleaning up after test."); diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java index fdcb5e7ebe06..61ce3eea2b15 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestContinuousBackupRestore.java @@ -54,7 +54,8 @@ public class IntegrationTestContinuousBackupRestore extends IntegrationTestBacku @Override @Before public void setUp() throws Exception { - initializeConfFromCommandLine(); + util = new IntegrationTestingUtility(); + conf = util.getConfiguration(); BackupTestUtil.enableBackup(conf); conf.set(CONF_BACKUP_MAX_WAL_SIZE, "10240"); conf.set(CONF_STAGED_WAL_FLUSH_INITIAL_DELAY, "10"); From add962cfc3f4f4a06fa1cd119d93b01290807cf7 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Tue, 4 Nov 2025 12:35:22 -0800 Subject: [PATCH 18/19] Change info log message to debug Change-Id: Ie8e94ce978836b1314525138726a13641360aae6 --- .../hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 391eccdade66..6c07a81abf9f 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -503,7 +503,7 @@ private boolean areAllTimestampsPastThreshold(Map timestamps, LOG.info("Checking if each region server in the replication check point " + "has caught up to the latest Put on {}", tableName.getNameAsString()); for (Map.Entry entry : timestamps.entrySet()) { - LOG.info("host={}, checkpoint timestamp={}, latest put timestamp={}, caught up={}", + LOG.debug("host={}, checkpoint timestamp={}, latest put timestamp={}, caught up={}", entry.getKey(), entry.getValue(), threshold, entry.getValue() >= threshold); if (entry.getValue() < threshold) { // Not returning right away so all hosts and timestamps are logged From 4cc466ec0c1e125259a2d908228285540f8cf892 Mon Sep 17 00:00:00 2001 From: Kevin Geiszler Date: Tue, 4 Nov 2025 16:45:24 -0800 Subject: [PATCH 19/19] Run mvn spotless:apply Change-Id: Ibeea379a65e801b60ec5124938b7aa17087025f0 --- .../hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java index 6c07a81abf9f..56349fe055a7 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/backup/IntegrationTestBackupRestoreBase.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.IntegrationTestBase; -import org.apache.hadoop.hbase.IntegrationTestingUtility; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;