Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,39 @@ public static int getDefaultBufferSize(final FileSystem fs) {
*/
public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm,
boolean overwrite) throws IOException {
return create(fs, path, perm, overwrite, true);
}

/**
* Create the specified file on the filesystem. By default, this will:
* <ol>
* <li>apply the umask in the configuration (if it is enabled)</li>
* <li>use the fs configured buffer size (or 4096 if not set)</li>
* <li>use the default replication</li>
* <li>use the default block size</li>
* <li>not track progress</li>
* </ol>
* @param fs {@link FileSystem} on which to write the file
* @param path {@link Path} to the file to write
* @param perm intial permissions
* @param overwrite Whether or not the created file should be overwritten.
* @param isRecursiveCreate recursively create parent directories
* @return output stream to the created file
* @throws IOException if the file cannot be created
*/
public static FSDataOutputStream create(FileSystem fs, Path path, FsPermission perm,
boolean overwrite, boolean isRecursiveCreate) throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Creating file={} with permission={}, overwrite={}", path, perm, overwrite);
LOG.trace("Creating file={} with permission={}, overwrite={}, recursive={}", path, perm,
overwrite, isRecursiveCreate);
}
if (isRecursiveCreate) {
return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
} else {
return fs.createNonRecursive(path, perm, overwrite, getDefaultBufferSize(fs),
getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
}
return fs.create(path, perm, overwrite, getDefaultBufferSize(fs),
getDefaultReplication(fs, path), getDefaultBlockSize(fs, path), null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
Expand Down Expand Up @@ -582,4 +583,104 @@ public void testCleanRestoreDir() throws Exception {
TableSnapshotInputFormat.cleanRestoreDir(job, snapshotName);
Assert.assertFalse(fs.exists(restorePath));
}

/**
* Test that explicitly restores a snapshot to a temp directory and reads the restored regions via
* ClientSideRegionScanner through a MapReduce job.
* <p>
* This test verifies the full workflow: 1. Create and load a table with data 2. Create a snapshot
* and restore the snapshot to a temporary directory 3. Configure a job to read the restored
* regions via ClientSideRegionScanner using TableSnapshotInputFormat and verify that it succeeds
* 4. Delete restored temporary directory 5. Configure a new job and verify that it fails
*/
@Test
public void testReadFromRestoredSnapshotViaMR() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
final String snapshotName = tableName + "_snapshot";
try {
if (UTIL.getAdmin().tableExists(tableName)) {
UTIL.deleteTable(tableName);
}
UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy });

Admin admin = UTIL.getAdmin();
int regionNum = admin.getRegions(tableName).size();
LOG.info("Created table with {} regions", regionNum);

Table table = UTIL.getConnection().getTable(tableName);
UTIL.loadTable(table, FAMILIES);
table.close();

Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
null, snapshotName, rootDir, fs, true);
Path tempRestoreDir = UTIL.getDataTestDirOnTestFS("restore_" + snapshotName);
RestoreSnapshotHelper.copySnapshotForScanner(UTIL.getConfiguration(), fs, rootDir,
tempRestoreDir, snapshotName);
Assert.assertTrue("Restore directory should exist", fs.exists(tempRestoreDir));

Job job = Job.getInstance(UTIL.getConfiguration());
job.setJarByClass(TestTableSnapshotInputFormat.class);
TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
TestTableSnapshotInputFormat.class);
Scan scan = new Scan().withStartRow(getStartRow()).withStopRow(getEndRow());
Configuration conf = job.getConfiguration();
conf.set("hbase.TableSnapshotInputFormat.snapshot.name", snapshotName);
conf.set("hbase.TableSnapshotInputFormat.restore.dir", tempRestoreDir.toString());
conf.setInt("hbase.mapreduce.splits.per.region", 1);
job.setReducerClass(TestTableSnapshotReducer.class);
job.setNumReduceTasks(1);
job.setOutputFormatClass(NullOutputFormat.class);
TableMapReduceUtil.initTableMapperJob(snapshotName, // table name (snapshot name in this case)
scan, TestTableSnapshotMapper.class, ImmutableBytesWritable.class, NullWritable.class, job,
false, false, TableSnapshotInputFormat.class);
TableMapReduceUtil.resetCacheConfig(conf);
Assert.assertTrue(job.waitForCompletion(true));
Assert.assertTrue(job.isSuccessful());

// Now verify that job fails when restore directory is deleted
Assert.assertTrue(fs.delete(tempRestoreDir, true));
Assert.assertFalse("Restore directory should not exist after deletion",
fs.exists(tempRestoreDir));
Job failureJob = Job.getInstance(UTIL.getConfiguration());
failureJob.setJarByClass(TestTableSnapshotInputFormat.class);
TableMapReduceUtil.addDependencyJarsForClasses(failureJob.getConfiguration(),
TestTableSnapshotInputFormat.class);
Configuration failureConf = failureJob.getConfiguration();
// Configure job to use the deleted restore directory
failureConf.set("hbase.TableSnapshotInputFormat.snapshot.name", snapshotName);
failureConf.set("hbase.TableSnapshotInputFormat.restore.dir", tempRestoreDir.toString());
failureConf.setInt("hbase.mapreduce.splits.per.region", 1);
failureJob.setReducerClass(TestTableSnapshotReducer.class);
failureJob.setNumReduceTasks(1);
failureJob.setOutputFormatClass(NullOutputFormat.class);

TableMapReduceUtil.initTableMapperJob(snapshotName, scan, TestTableSnapshotMapper.class,
ImmutableBytesWritable.class, NullWritable.class, failureJob, false, false,
TableSnapshotInputFormat.class);
TableMapReduceUtil.resetCacheConfig(failureConf);

Assert.assertFalse("Restore directory should not exist before job execution",
fs.exists(tempRestoreDir));
failureJob.waitForCompletion(true);

Assert.assertFalse("Job should fail since the restored snapshot directory is deleted",
failureJob.isSuccessful());

} finally {
try {
if (UTIL.getAdmin().tableExists(tableName)) {
UTIL.deleteTable(tableName);
}
} catch (Exception e) {
LOG.warn("Error deleting table", e);
}
try {
UTIL.getAdmin().deleteSnapshot(snapshotName);
} catch (Exception e) {
LOG.warn("Error deleting snapshot", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -37,9 +38,12 @@
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
Expand Down Expand Up @@ -102,6 +106,7 @@ void fixHoles(CatalogJanitorReport report) {

final List<RegionInfo> newRegionInfos = createRegionInfosForHoles(holes);
final List<RegionInfo> newMetaEntries = createMetaEntries(masterServices, newRegionInfos);
createRegionDirectories(masterServices, newMetaEntries);
final TransitRegionStateProcedure[] assignProcedures =
masterServices.getAssignmentManager().createRoundRobinAssignProcedures(newMetaEntries);

Expand Down Expand Up @@ -217,6 +222,27 @@ private static List<RegionInfo> createMetaEntries(final MasterServices masterSer
return createMetaEntriesSuccesses;
}

private static void createRegionDirectories(final MasterServices masterServices,
final List<RegionInfo> regions) {
if (regions.isEmpty()) {
return;
}
final MasterFileSystem mfs = masterServices.getMasterFileSystem();
final Path rootDir = mfs.getRootDir();
for (RegionInfo regionInfo : regions) {
if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID) {
try {
Path tableDir = CommonFSUtils.getTableDir(rootDir, regionInfo.getTable());
HRegionFileSystem.createRegionOnFileSystem(masterServices.getConfiguration(),
mfs.getFileSystem(), tableDir, regionInfo);
} catch (IOException e) {
LOG.warn("Failed to create region directory for {}: {}",
regionInfo.getRegionNameAsString(), e.getMessage(), e);
}
}
}
}

/**
* Fix overlaps noted in CJ consistency report.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ assert getRegion().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID || isFailed()
setNextState(TruncateRegionState.TRUNCATE_REGION_MAKE_ONLINE);
break;
case TRUNCATE_REGION_MAKE_ONLINE:
createRegionOnFileSystem(env);
addChildProcedure(createAssignProcedures(env));
setNextState(TruncateRegionState.TRUNCATE_REGION_POST_OPERATION);
break;
Expand All @@ -130,6 +131,20 @@ assert getRegion().getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID || isFailed()
return Flow.HAS_MORE_STATE;
}

private void createRegionOnFileSystem(final MasterProcedureEnv env) throws IOException {
RegionStateNode regionNode =
env.getAssignmentManager().getRegionStates().getRegionStateNode(getRegion());
regionNode.lock();
try {
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
final Path tableDir = CommonFSUtils.getTableDir(mfs.getRootDir(), getTableName());
HRegionFileSystem.createRegionOnFileSystem(env.getMasterConfiguration(), mfs.getFileSystem(),
tableDir, getRegion());
} finally {
regionNode.unlock();
}
}

private void deleteRegionFromFileSystem(final MasterProcedureEnv env) throws IOException {
RegionStateNode regionNode =
env.getAssignmentManager().getRegionStates().getRegionStateNode(getRegion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,10 @@ private static void writeRegionInfoFileContent(final Configuration conf, final F
// First check to get the permissions
FsPermission perms = CommonFSUtils.getFilePermissions(fs, conf, HConstants.DATA_FILE_UMASK_KEY);
// Write the RegionInfo file content
try (FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null)) {
// HBASE-29662: Fail .regioninfo file creation, if the region directory doesn't exist,
// avoiding silent masking of missing region directories during region initialization.
// The region directory should already exist when this method is called.
try (FSDataOutputStream out = FSUtils.create(conf, fs, regionInfoFile, perms, null, false)) {
Copy link
Contributor

@apurtell apurtell Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here why this change, providing the JIRA identifier. The reason not to recursively create directories is important, we don't want someone refactoring this code later to miss that.

out.write(content);
}
}
Expand Down Expand Up @@ -848,6 +851,14 @@ private void writeRegionInfoOnFilesystem(final byte[] regionInfoContent, final b
CommonFSUtils.delete(fs, tmpPath, true);
}

// Check parent (region) directory exists first to maintain HBASE-29662 protection
if (!fs.exists(getRegionDir())) {
throw new IOException("Region directory does not exist: " + getRegionDir());
}
if (!fs.exists(getTempDir())) {
fs.mkdirs(getTempDir());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, this was not covered so far?

Copy link
Contributor Author

@gvprathyusha6 gvprathyusha6 Nov 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was created previously as part of recursion in FS while creating file, now that I removed it and now we dont just blindly create all the directories that are present in path, we would need to create tmp directory explicitly

}

// Write HRI to a file in case we need to recover hbase:meta
writeRegionInfoFileContent(conf, fs, tmpPath, regionInfoContent);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,32 @@ public static boolean deleteRegionDir(final Configuration conf, final RegionInfo
*/
public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
FsPermission perm, InetSocketAddress[] favoredNodes) throws IOException {
return create(conf, fs, path, perm, favoredNodes, true);
}

/**
* Create the specified file on the filesystem. By default, this will:
* <ol>
* <li>overwrite the file if it exists</li>
* <li>apply the umask in the configuration (if it is enabled)</li>
* <li>use the fs configured buffer size (or 4096 if not set)</li>
* <li>use the configured column family replication or default replication if
* {@link ColumnFamilyDescriptorBuilder#DEFAULT_DFS_REPLICATION}</li>
* <li>use the default block size</li>
* <li>not track progress</li>
* </ol>
* @param conf configurations
* @param fs {@link FileSystem} on which to write the file
* @param path {@link Path} to the file to write
* @param perm permissions
* @param favoredNodes favored data nodes
* @param isRecursiveCreate recursively create parent directories
* @return output stream to the created file
* @throws IOException if the file cannot be created
*/
public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path path,
FsPermission perm, InetSocketAddress[] favoredNodes, boolean isRecursiveCreate)
throws IOException {
if (fs instanceof HFileSystem) {
FileSystem backingFs = ((HFileSystem) fs).getBackingFs();
if (backingFs instanceof DistributedFileSystem) {
Expand All @@ -230,7 +256,7 @@ public static FSDataOutputStream create(Configuration conf, FileSystem fs, Path
}

}
return CommonFSUtils.create(fs, path, perm, true);
return CommonFSUtils.create(fs, path, perm, true, isRecursiveCreate);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@
import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.mapreduce.MapreduceTestingShim;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
Expand All @@ -109,6 +110,7 @@
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
Expand Down Expand Up @@ -3722,4 +3724,22 @@ public static void await(final long sleepMillis, final BooleanSupplier condition
throw e;
}
}

public void createRegionDir(RegionInfo hri) throws IOException {
Path rootDir = getDataTestDir();
Path tableDir = CommonFSUtils.getTableDir(rootDir, hri.getTable());
Path regionDir = new Path(tableDir, hri.getEncodedName());
FileSystem fs = getTestFileSystem();
if (!fs.exists(regionDir)) {
fs.mkdirs(regionDir);
}
}

public void createRegionDir(RegionInfo regionInfo, MasterFileSystem masterFileSystem)
throws IOException {
Path tableDir =
CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(conf), regionInfo.getTable());
HRegionFileSystem.createRegionOnFileSystem(conf, masterFileSystem.getFileSystem(), tableDir,
regionInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ public void before() throws IOException {
this.rss = new MockRegionServerServices(HTU.getConfiguration());
ChunkCreator.initialize(MemStoreLAB.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null,
MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT);
HTU.createRegionDir(ri);
this.region = HRegion.openHRegion(ri, td, null, HTU.getConfiguration(), this.rss, null);
}

@After
public void after() throws IOException {
this.region.close();
HTU.cleanupTestDir();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ private static RegionInfo makeOverlap(MasterServices services, RegionInfo a, Reg
throws IOException {
RegionInfo overlapRegion = RegionInfoBuilder.newBuilder(a.getTable())
.setStartKey(a.getStartKey()).setEndKey(b.getEndKey()).build();
TEST_UTIL.createRegionDir(overlapRegion, services.getMasterFileSystem());
MetaTableAccessor.putsToMetaTable(services.getConnection(),
Collections.singletonList(MetaTableAccessor.makePutFromRegionInfo(overlapRegion,
EnvironmentEdgeManager.currentTime())));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,11 @@ private HRegion initHRegion(TableDescriptor htd, RegionInfo info) throws IOExcep
CommonFSUtils.setRootDir(walConf, tableDir);
final WALFactory wals = new WALFactory(walConf, "log_" + info.getEncodedName());
HRegion region = new HRegion(fs, wals.getWAL(info), conf, htd, null);

Path regionDir = new Path(tableDir, info.getEncodedName());
if (!fs.getFileSystem().exists(regionDir)) {
fs.getFileSystem().mkdirs(regionDir);
}
region.initialize();

return region;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ private HRegion initHRegion(TableDescriptor htd, RegionInfo info) throws IOExcep
.rename(eq(new Path(storeDir, ERROR_FILE)), any());

HRegionFileSystem fs = new HRegionFileSystem(conf, errFS, tableDir, info);
fs.createRegionOnFileSystem(conf, fs.getFileSystem(), tableDir, info);
final Configuration walConf = new Configuration(conf);
CommonFSUtils.setRootDir(walConf, tableDir);
final WALFactory wals = new WALFactory(walConf, "log_" + info.getEncodedName());
Expand Down
Loading