diff --git a/src/main/java/org/apache/hadoop/fs/FileSystemCleaner.java b/src/main/java/org/apache/hadoop/fs/FileSystemCleaner.java new file mode 100644 index 0000000..1ef0e85 --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/FileSystemCleaner.java @@ -0,0 +1,92 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +/** + * Manage FileSystemHolders that removed from PrestoFileSystemCache, and close them after PRESTO_HDFS_EXPIRED_FS_DELAY_CLOSE_TIME. + * To avoid Filesystem closed Exception caused by closing FileSystem in use. + */ +public class FileSystemCleaner +{ + public static final Log log = LogFactory.getLog(FileSystemCleaner.class); + public static final String PRESTO_HDFS_EXPIRED_FS_DELAY_CLOSE_TIME = "presto.hdfs.expired.fs.delay.close.time"; + public static final String PRESTO_HDFS_EXPIRED_FS_CHECK_INTERVAL = "presto.hdfs.expired.fs.check.interval"; + public static final long DEFAULT_PRESTO_HDFS_FS_CACHE_DELAY_CLOSE_TIME = 5 * 60 * 1000; // five minutes + public static final long DEFAULT_PRESTO_HDFS_EXPIRED_FS_CHECK_INTERVAL = 60 * 1000; // one minute + + private static final FileSystemCleaner manager = new FileSystemCleaner(); + private static final List fileSystemHolderList = new LinkedList<>(); + + public FileSystemCleaner() + { + Thread fileSystemCleanerTask = new FileSystemCleanerTask(); + fileSystemCleanerTask.setDaemon(true); + fileSystemCleanerTask.setName("FileSystemCleanerTask"); + fileSystemCleanerTask.start(); + } + + public static FileSystemCleaner getInstance() + { + return manager; + } + + public void addExpiredFileSystem(PrestoFileSystemCache.FileSystemHolder fileSystemHolder) + { + fileSystemHolder.setExpireTimestamp(System.currentTimeMillis()); + fileSystemHolderList.add(fileSystemHolder); + } + + private static class FileSystemCleanerTask + extends Thread + { + @Override + public void run() + { + while (true) { + for (PrestoFileSystemCache.FileSystemHolder holder : fileSystemHolderList) { + long delayCloseTime = holder.getFileSystem().getConf().getLong(PRESTO_HDFS_EXPIRED_FS_DELAY_CLOSE_TIME, + DEFAULT_PRESTO_HDFS_FS_CACHE_DELAY_CLOSE_TIME); + if (System.currentTimeMillis() - holder.getExpireTimestamp() >= delayCloseTime) { + try { + log.info(String.format("Closing expired FileSystem{expireTimestamp: %s, delayCloseTime: %s ms}", + holder.getExpireTimestamp(), delayCloseTime)); + holder.getFileSystem().close(); + fileSystemHolderList.remove(holder); + } + catch (IOException e) { + log.error("Close expired file system fail ", e); + } + } + else { + break; + } + } + + try { + Thread.sleep(DEFAULT_PRESTO_HDFS_EXPIRED_FS_CHECK_INTERVAL); + } + catch (InterruptedException e) { + log.error(e); + } + } + } + } +} diff --git a/src/main/java/org/apache/hadoop/fs/PrestoFileSystemCache.java b/src/main/java/org/apache/hadoop/fs/PrestoFileSystemCache.java index 4005e5a..11f270a 100644 --- a/src/main/java/org/apache/hadoop/fs/PrestoFileSystemCache.java +++ b/src/main/java/org/apache/hadoop/fs/PrestoFileSystemCache.java @@ -99,7 +99,8 @@ private synchronized FileSystem getInternal(URI uri, Configuration conf, long un // Kerberos re-login occurs, re-create the file system and cache it using // the same key. if (fileSystemRefresh(uri, conf, privateCredentials, fileSystemHolder)) { - map.remove(key); + FileSystemHolder expiredFSHolder = map.remove(key); + FileSystemCleaner.getInstance().addExpiredFileSystem(expiredFSHolder); FileSystem fileSystem = createFileSystem(uri, conf); fileSystemHolder = new FileSystemHolder(fileSystem, privateCredentials); map.put(key, fileSystemHolder); @@ -111,7 +112,9 @@ private synchronized FileSystem getInternal(URI uri, Configuration conf, long un private boolean fileSystemRefresh(URI uri, Configuration conf, Set privateCredentials, FileSystemHolder fileSystemHolder) { if (isHdfs(uri)) { - return !fileSystemHolder.getPrivateCredentials().equals(privateCredentials); + // privateCredentials size() will be more than fileSystemHolder.getPrivateCredentials(), + // but privateCredentials contains fileSystemHolder.getPrivateCredentials() + return !privateCredentials.containsAll(fileSystemHolder.getPrivateCredentials()); } if ("gs".equals(uri.getScheme())) { String existingGcsToken = fileSystemHolder.getFileSystem().getConf().get(PRESTO_GCS_OAUTH_ACCESS_TOKEN_KEY); @@ -147,21 +150,7 @@ private FileSystem createFileSystem(URI uri, Configuration conf) } final FileSystem original = (FileSystem) ReflectionUtils.newInstance(clazz, conf); original.initialize(uri, conf); - FileSystem wrapper = createPrestoFileSystemWrapper(original); - FinalizerService.getInstance().addFinalizer(wrapper, new Runnable() - { - @Override - public void run() - { - try { - original.close(); - } - catch (IOException e) { - log.error("Error occurred when finalizing file system", e); - } - } - }); - return wrapper; + return createPrestoFileSystemWrapper(original); } protected FileSystem createPrestoFileSystemWrapper(FileSystem original) @@ -305,10 +294,11 @@ public String toString() } } - private static class FileSystemHolder + protected static class FileSystemHolder { private final FileSystem fileSystem; private final Set privateCredentials; + private long expireTimestamp; // timestamp of remove from cache public FileSystemHolder(FileSystem fileSystem, Set privateCredentials) { @@ -332,7 +322,18 @@ public String toString() return toStringHelper(this) .add("fileSystem", fileSystem) .add("privateCredentials", privateCredentials) + .add("expireTimestamp", expireTimestamp) .toString(); } + + public void setExpireTimestamp(long expireTimestamp) + { + this.expireTimestamp = expireTimestamp; + } + + public long getExpireTimestamp() + { + return expireTimestamp; + } } }