From 1da81f291bb1852ddb2f9ac62e506c5332a8fd82 Mon Sep 17 00:00:00 2001 From: tian bao <2011xuesong@gmail.com> Date: Wed, 27 Aug 2025 17:32:58 +0800 Subject: [PATCH 1/9] cached fileindex cache version 2 cache ut change ut --- .../connector/hive/read/HiveFileIndex.scala | 3 +- .../hive/read/HiveFileStatusCache.scala | 142 ++++++++++++++++++ .../hive/HiveFileStatusCacheSuite.scala | 96 ++++++++++++ 3 files changed, 240 insertions(+), 1 deletion(-) create mode 100644 extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala create mode 100644 extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala index 0142c556194..217a2c9d137 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala @@ -48,7 +48,8 @@ class HiveCatalogFileIndex( private val partPathToBindHivePart: mutable.Map[PartitionPath, CatalogTablePartition] = mutable.Map() - private val fileStatusCache = FileStatusCache.getOrCreate(sparkSession) + private val fileStatusCache = + HiveFileStatusCache.getOrCreate(sparkSession, catalogTable.qualifiedName) private val baseLocation: Option[URI] = table.storage.locationUri diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala new file mode 100644 index 00000000000..0ef2743a455 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.hive.read + +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean + +import scala.collection.JavaConverters._ + +import com.google.common.cache._ +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} +import org.apache.spark.util.SizeEstimator + +/** + * Use [[HiveFileStatusCache.getOrCreate()]] to construct a globally shared file status cache. + */ +object HiveFileStatusCache { + private var sharedCache: HiveSharedInMemoryCache = _ + + /** + * @return a new FileStatusCache based on session configuration. Cache memory quota is + * shared across all clients. + */ + def getOrCreate(session: SparkSession, qualifiedName: String): FileStatusCache = + synchronized { + if (session.sessionState.conf.manageFilesourcePartitions && + session.sessionState.conf.filesourcePartitionFileCacheSize > 0) { + if (sharedCache == null) { + sharedCache = new HiveSharedInMemoryCache( + session.sessionState.conf.filesourcePartitionFileCacheSize, + session.sessionState.conf.metadataCacheTTL) + } + sharedCache.createForNewClient(qualifiedName) + } else { + NoopCache + } + } + + def resetForTesting(): Unit = synchronized { + sharedCache = null + } +} + +/** + * An implementation that caches partition file statuses in memory. + * + * @param maxSizeInBytes max allowable cache size before entries start getting evicted + */ +private class HiveSharedInMemoryCache(maxSizeInBytes: Long, cacheTTL: Long) extends Logging { + + // Opaque object that uniquely identifies a shared cache user + private type ClientId = Object + + private val warnedAboutEviction = new AtomicBoolean(false) + + // we use a composite cache key in order to distinguish entries inserted by different clients + private val cache: Cache[(ClientId, Path), Array[FileStatus]] = { + // [[Weigher]].weigh returns Int so we could only cache objects < 2GB + // instead, the weight is divided by this factor (which is smaller + // than the size of one [[FileStatus]]). + // so it will support objects up to 64GB in size. + val weightScale = 32 + val weigher = new Weigher[(ClientId, Path), Array[FileStatus]] { + override def weigh(key: (ClientId, Path), value: Array[FileStatus]): Int = { + val estimate = (SizeEstimator.estimate(key) + SizeEstimator.estimate(value)) / weightScale + if (estimate > Int.MaxValue) { + logWarning(s"Cached table partition metadata size is too big. Approximating to " + + s"${Int.MaxValue.toLong * weightScale}.") + Int.MaxValue + } else { + estimate.toInt + } + } + } + val removalListener = new RemovalListener[(ClientId, Path), Array[FileStatus]]() { + override def onRemoval( + removed: RemovalNotification[(ClientId, Path), Array[FileStatus]]): Unit = { + if (removed.getCause == RemovalCause.SIZE && + warnedAboutEviction.compareAndSet(false, true)) { + logWarning( + "Evicting cached table partition metadata from memory due to size constraints " + + "(spark.sql.hive.filesourcePartitionFileCacheSize = " + + maxSizeInBytes + " bytes). This may impact query planning performance.") + } + } + } + + var builder = CacheBuilder.newBuilder() + .weigher(weigher) + .removalListener(removalListener) + .maximumWeight(maxSizeInBytes / weightScale) + + if (cacheTTL > 0) { + builder = builder.expireAfterWrite(cacheTTL, TimeUnit.SECONDS) + } + + builder.build[(ClientId, Path), Array[FileStatus]]() + } + + /** + * @return a FileStatusCache that does not share any entries with any other client, but does + * share memory resources for the purpose of cache eviction. + */ + def createForNewClient(clientId: Object): HiveFileStatusCache = new HiveFileStatusCache { + + override def getLeafFiles(path: Path): Option[Array[FileStatus]] = { + Option(cache.getIfPresent((clientId, path))) + } + + override def putLeafFiles(path: Path, leafFiles: Array[FileStatus]): Unit = { + cache.put((clientId, path), leafFiles) + } + + override def invalidateAll(): Unit = { + cache.asMap.asScala.foreach { case (key, value) => + if (key._1 == clientId) { + cache.invalidate(key) + } + } + } + } + + abstract class HiveFileStatusCache extends FileStatusCache {} +} diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala new file mode 100644 index 00000000000..3bb66800625 --- /dev/null +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.spark.connector.hive + +import scala.concurrent.duration.DurationInt + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.scalatest.concurrent.Eventually.eventually +import org.scalatest.concurrent.Futures.timeout + +import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache + +class HiveFileStatusCacheSuite extends KyuubiHiveTest { + + override def beforeEach(): Unit = { + super.beforeEach() + HiveFileStatusCache.resetForTesting() + } + + override def afterEach(): Unit = { + super.afterEach() + HiveFileStatusCache.resetForTesting() + } + + test("cached by qualifiedName") { + val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) + try { + // using 'SQLConf.get.setConf' instead of 'withSQLConf' to set a static config at runtime + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L) + + val path = new Path("/dummy_tmp", "abc") + val files = (1 to 3).map(_ => new FileStatus()) + + HiveFileStatusCache.resetForTesting() + val fileStatusCacheTabel1 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table1") + fileStatusCacheTabel1.putLeafFiles(path, files.toArray) + val fileStatusCacheTabel2 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table1") + val fileStatusCacheTabel3 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table2") + + // Exactly 3 files are cached. + assert(fileStatusCacheTabel1.getLeafFiles(path).get.length === 3) + assert(fileStatusCacheTabel2.getLeafFiles(path).get.length === 3) + assert(fileStatusCacheTabel3.getLeafFiles(path).isEmpty === true) + // Wait until the cache expiration. + eventually(timeout(3.seconds)) { + // And the cache is gone. + assert(fileStatusCacheTabel1.getLeafFiles(path).isEmpty === true) + assert(fileStatusCacheTabel2.getLeafFiles(path).isEmpty === true) + assert(fileStatusCacheTabel3.getLeafFiles(path).isEmpty === true) + } + } finally { + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) + } + } + + test("expire FileStatusCache if TTL is configured") { + val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) + try { + // using 'SQLConf.get.setConf' instead of 'withSQLConf' to set a static config at runtime + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, 1L) + + val path = new Path("/dummy_tmp", "abc") + val files = (1 to 3).map(_ => new FileStatus()) + + HiveFileStatusCache.resetForTesting() + val fileStatusCache = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table") + fileStatusCache.putLeafFiles(path, files.toArray) + + // Exactly 3 files are cached. + assert(fileStatusCache.getLeafFiles(path).get.length === 3) + // Wait until the cache expiration. + eventually(timeout(3.seconds)) { + // And the cache is gone. + assert(fileStatusCache.getLeafFiles(path).isEmpty === true) + } + } finally { + SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) + } + } +} From 9353f0c79cce190f9883c463c60261cf54f7584f Mon Sep 17 00:00:00 2001 From: tian bao <2011xuesong@gmail.com> Date: Tue, 9 Sep 2025 16:01:12 +0800 Subject: [PATCH 2/9] fix failed tests --- .../kyuubi/spark/connector/hive/HiveTableCatalog.scala | 6 +++++- .../connector/hive/HiveFileStatusCacheSuite.scala | 10 ---------- .../kyuubi/spark/connector/hive/KyuubiHiveTest.scala | 3 +++ 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index f72881f928f..bbd9df41d05 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -395,11 +395,15 @@ class HiveTableCatalog(sparkSession: SparkSession) override def dropTable(ident: Identifier): Boolean = withSparkSQLConf(LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME -> "true") { try { - if (loadTable(ident) != null) { + val table = loadTable(ident) + if (table != null) { catalog.dropTable( ident.asTableIdentifier, ignoreIfNotExists = true, purge = true /* skip HDFS trash */ ) + if (table.isInstanceOf[HiveTable]) { + table.asInstanceOf[HiveTable].fileIndex.refresh() + } true } else { false diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala index 3bb66800625..3706bed4556 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala @@ -28,16 +28,6 @@ import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache class HiveFileStatusCacheSuite extends KyuubiHiveTest { - override def beforeEach(): Unit = { - super.beforeEach() - HiveFileStatusCache.resetForTesting() - } - - override def afterEach(): Unit = { - super.afterEach() - HiveFileStatusCache.resetForTesting() - } - test("cached by qualifiedName") { val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) try { diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala index fb5bcd62184..68d01f1cc78 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.connector.catalog.{SupportsNamespaces, TableCatalog} import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.Utils import org.apache.kyuubi.spark.connector.common.LocalSparkSession +import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache abstract class KyuubiHiveTest extends QueryTest with Logging { @@ -49,11 +50,13 @@ abstract class KyuubiHiveTest extends QueryTest with Logging { override def beforeEach(): Unit = { super.beforeAll() + HiveFileStatusCache.resetForTesting() getOrCreateSpark() } override def afterEach(): Unit = { super.afterAll() + HiveFileStatusCache.resetForTesting() LocalSparkSession.stop(innerSpark) } From 9f2168c7fd2995321b6d28a52fd6d627fe5be7a3 Mon Sep 17 00:00:00 2001 From: yangyx <360508847@qq.com> Date: Mon, 22 Sep 2025 14:22:16 +0800 Subject: [PATCH 3/9] Improve the lifecycle management of cache --- .../connector/hive/HiveTableCatalog.scala | 20 +- .../connector/hive/read/HiveFileIndex.scala | 4 +- .../connector/hive/write/HiveBatchWrite.scala | 6 +- .../connector/hive/HiveCatalogSuite.scala | 35 ++-- .../hive/HiveFileStatusCacheSuite.scala | 180 +++++++++++++++++- .../spark/connector/hive/KyuubiHiveTest.scala | 1 + 6 files changed, 217 insertions(+), 29 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index bbd9df41d05..847c63dd9f6 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -20,11 +20,9 @@ package org.apache.kyuubi.spark.connector.hive import java.lang.{Boolean => JBoolean, Long => JLong} import java.net.URI import java.util - import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Try - import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf import org.apache.spark.internal.Logging @@ -45,10 +43,10 @@ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, GLOBAL_TEMP_DATABASE} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap - import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf -import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider, toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper} +import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper, getStorageFormatAndProvider, toCatalogDatabase} import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature +import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors} /** @@ -388,7 +386,7 @@ class HiveTableCatalog(sparkSession: SparkSession) case _: NoSuchTableException => throw new NoSuchTableException(ident) } - + invalidateTable(ident) loadTable(ident) } @@ -401,9 +399,7 @@ class HiveTableCatalog(sparkSession: SparkSession) ident.asTableIdentifier, ignoreIfNotExists = true, purge = true /* skip HDFS trash */ ) - if (table.isInstanceOf[HiveTable]) { - table.asInstanceOf[HiveTable].fileIndex.refresh() - } + invalidateTable(ident) true } else { false @@ -421,10 +417,16 @@ class HiveTableCatalog(sparkSession: SparkSession) } // Load table to make sure the table exists - loadTable(oldIdent) + val table = loadTable(oldIdent) catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier) + invalidateTable(oldIdent) } + override def invalidateTable(ident: Identifier): Unit = { + val qualifiedName = s"$catalogName.${ident.namespace().mkString(".")}.${ident.name()}" + HiveFileStatusCache.getOrCreate(sparkSession, qualifiedName).invalidateAll() + } + private def toOptions(properties: Map[String, String]): Map[String, String] = { properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map { case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala index 217a2c9d137..66074f0e5bb 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala @@ -48,8 +48,8 @@ class HiveCatalogFileIndex( private val partPathToBindHivePart: mutable.Map[PartitionPath, CatalogTablePartition] = mutable.Map() - private val fileStatusCache = - HiveFileStatusCache.getOrCreate(sparkSession, catalogTable.qualifiedName) + private val fileStatusCache = HiveFileStatusCache.getOrCreate(sparkSession, + hiveCatalog.name() + "." + catalogTable.qualifiedName) private val baseLocation: Option[URI] = table.storage.locationUri diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala index 2a30ac434c8..9b35556cffb 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala @@ -18,7 +18,6 @@ package org.apache.kyuubi.spark.connector.hive.write import scala.util.control.NonFatal - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf @@ -32,8 +31,8 @@ import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTas import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.toSQLValue import org.apache.spark.sql.types.StringType - import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils, HiveTableCatalog, KyuubiHiveConnectorException} +import org.apache.spark.sql.connector.catalog.Identifier class HiveBatchWrite( sparkSession: SparkSession, @@ -69,6 +68,9 @@ class HiveBatchWrite( // un-cache this table. hiveTableCatalog.catalog.invalidateCachedTable(table.identifier) + hiveTableCatalog.invalidateTable( + Identifier.of(Array(table.identifier.database.getOrElse("")), table.identifier.table) + ) val catalog = hiveTableCatalog.catalog if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala index da53b898927..e4843619677 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.IdentifierHelper import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.{READ_CONVERT_METASTORE_ORC, READ_CONVERT_METASTORE_PARQUET} -import org.apache.kyuubi.spark.connector.hive.read.HiveScan +import org.apache.kyuubi.spark.connector.hive.read.{HiveFileStatusCache, HiveScan} class HiveCatalogSuite extends KyuubiHiveTest { @@ -284,16 +284,29 @@ class HiveCatalogSuite extends KyuubiHiveTest { } test("invalidateTable") { - val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) - // Hive v2 don't cache table - catalog.invalidateTable(testIdent) - - val loaded = catalog.loadTable(testIdent) - - assert(table.name == loaded.name) - assert(table.schema == loaded.schema) - assert(table.properties == loaded.properties) - catalog.dropTable(testIdent) + withSparkSession() { spark => + val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) + val qualifiedName = s"$catalogName.${testIdent.namespace().mkString(".")}.${testIdent.name()}" + val location = table.asInstanceOf[HiveTable].catalogTable.location + + spark.sql(s"select * from $qualifiedName").collect() + assert(HiveFileStatusCache.getOrCreate(spark, qualifiedName) + .getLeafFiles(new Path(location)).isDefined) + + catalog.invalidateTable(testIdent) + assert(HiveFileStatusCache.getOrCreate(spark, qualifiedName) + .getLeafFiles(new Path(location)).isEmpty) + + spark.sql(s"select * from $qualifiedName").collect() + assert(HiveFileStatusCache.getOrCreate(spark, qualifiedName) + .getLeafFiles(new Path(location)).isDefined) + + val loaded = catalog.loadTable(testIdent) + assert(table.name == loaded.name) + assert(table.schema == loaded.schema) + assert(table.properties == loaded.properties) + catalog.dropTable(testIdent) + } } test("listNamespaces: fail if missing namespace") { diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala index 3706bed4556..31b9cea45c7 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala @@ -17,14 +17,15 @@ package org.apache.kyuubi.spark.connector.hive +import com.google.common.collect.Maps import scala.concurrent.duration.DurationInt - import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.scalatest.concurrent.Eventually.eventually import org.scalatest.concurrent.Futures.timeout - import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache +import org.apache.spark.sql.connector.catalog.Identifier +import org.apache.spark.sql.util.CaseInsensitiveStringMap class HiveFileStatusCacheSuite extends KyuubiHiveTest { @@ -38,9 +39,9 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { val files = (1 to 3).map(_ => new FileStatus()) HiveFileStatusCache.resetForTesting() - val fileStatusCacheTabel1 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table1") + val fileStatusCacheTabel1 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.cat1Table") fileStatusCacheTabel1.putLeafFiles(path, files.toArray) - val fileStatusCacheTabel2 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table1") + val fileStatusCacheTabel2 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.cat1Table") val fileStatusCacheTabel3 = HiveFileStatusCache.getOrCreate(spark, "catalog.db.table2") // Exactly 3 files are cached. @@ -83,4 +84,173 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) } } -} + + private def newCatalog(): HiveTableCatalog = { + val catalog = new HiveTableCatalog + val properties = Maps.newHashMap[String, String]() + properties.put("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:memorydb;create=true") + properties.put("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver") + catalog.initialize(catalogName, new CaseInsensitiveStringMap(properties)) + catalog + } + + test("expire FileStatusCache when insert into") { + val dbName = "default" + val tbName = "tbl_partition" + val table = s"${catalogName}.${dbName}.${tbName}" + + withTable(table) { + spark.sql(s"create table $table (age int)partitioned by(city string) stored as orc").collect() + val location = newCatalog() + .loadTable(Identifier.of(Array(dbName), tbName)) + .asInstanceOf[HiveTable] + .catalogTable.location.toString + + spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect() + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isEmpty) + + assert(spark.sql(s"select * from $table").count() === 5) + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).get.length === 1) + + spark.sql(s"insert into $table partition(city='ct') values(11),(21),(31),(41),(51)").collect() + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isEmpty) + + assert(spark.sql(s"select * from $table").count() === 10) + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).get.length === 2) + } + } + + test("expire FileStatusCache when insert overwrite") { + val dbName = "default" + val tbName = "tbl_partition" + val table = s"${catalogName}.${dbName}.${tbName}" + + withTable(table) { + spark.sql(s"create table $table (age int)partitioned by(city string) stored as orc").collect() + val location = newCatalog() + .loadTable(Identifier.of(Array(dbName), tbName)) + .asInstanceOf[HiveTable] + .catalogTable.location.toString + + spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect() + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isEmpty) + + assert(spark.sql(s"select * from $table").count() === 5) + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).get.length === 1) + + spark.sql(s"insert overwrite $table partition(city='ct') values(11),(21),(31),(41),(51)") + .collect() + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isEmpty) + + assert(spark.sql(s"select * from $table").count() === 5) + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).get.length === 1) + } + } + + test("expire FileStatusCache when alter Table") { + val dbName = "default" + val tbName = "tbl_partition" + val table = s"${catalogName}.${dbName}.${tbName}" + + withTable(table) { + spark.sql(s"create table $table (age int)partitioned by(city string) stored as orc").collect() + val location = newCatalog() + .loadTable(Identifier.of(Array(dbName), tbName)) + .asInstanceOf[HiveTable] + .catalogTable.location.toString + + spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect() + spark.sql(s"select * from $table").collect() + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).get.length === 1) + + spark.sql(s"ALTER TABLE $table ADD COLUMNS (name string)").collect() + assert(HiveFileStatusCache.getOrCreate(spark, table) + .getLeafFiles(new Path(s"$location/city=ct")).isEmpty) + } + } + + test("expire FileStatusCache when rename Table") { + val dbName = "default" + val oldTbName = "tbl_partition" + val newTbName = "tbl_partition_new" + val oldTable = s"${catalogName}.${dbName}.${oldTbName}" + val newTable = s"${catalogName}.${dbName}.${newTbName}" + + withTable(newTable) { + spark.sql(s"create table ${oldTable} (age int)partitioned by(city string) stored as orc") + .collect() + spark.sql(s"insert into $oldTable partition(city='ct') values(10),(20),(30),(40),(50)") + .collect() + spark.sql(s"select * from $oldTable").collect() + + val oldLocation = newCatalog() + .loadTable(Identifier.of(Array(dbName), oldTbName)) + .asInstanceOf[HiveTable] + .catalogTable.location.toString + assert(HiveFileStatusCache.getOrCreate(spark, oldTable) + .getLeafFiles(new Path(s"$oldLocation/city=ct")).get.length === 1) + + spark.sql(s"DROP TABLE IF EXISTS ${newTable}").collect() + spark.sql(s"use ${catalogName}.${dbName}").collect() + spark.sql(s"ALTER TABLE $oldTbName RENAME TO $newTbName").collect() + val newLocation = newCatalog() + .loadTable(Identifier.of(Array(dbName), newTbName)) + .asInstanceOf[HiveTable] + .catalogTable.location.toString + + assert(HiveFileStatusCache.getOrCreate(spark, oldTable) + .getLeafFiles(new Path(s"$oldLocation/city=ct")) + .isEmpty) + + assert(HiveFileStatusCache.getOrCreate(spark, newTable) + .getLeafFiles(new Path(s"$newLocation/city=ct")) + .isEmpty) + } + } + + test("FileStatusCache isolated between different catalogs with same database.table") { + val catalog1 = catalogName + val catalog2 = "hive2" + val dbName = "default" + val tbName = "tbl_partition" + val dbTableShortName = s"${dbName}.${tbName}" + val cat1Table = s"${catalog1}.${dbTableShortName}" + val cat2Table = s"${catalog2}.${dbTableShortName}" + + withTable(cat1Table, cat2Table) { + spark.sql(s"CREATE TABLE IF NOT EXISTS $cat1Table (age int)partitioned by(city string)" + + s" stored as orc").collect() + val location = newCatalog() + .loadTable(Identifier.of(Array(dbName), tbName)) + .asInstanceOf[HiveTable] + .catalogTable.location.toString + + spark.sql(s"use $catalog1").collect() + spark.sql(s"insert into $dbTableShortName partition(city='ct1') " + + s"values(11),(12),(13),(14),(15)").collect() + spark.sql(s"select * from $cat1Table where city='ct1'").collect() + assert(HiveFileStatusCache.getOrCreate(spark, cat1Table) + .getLeafFiles(new Path(s"$location/city=ct1")) + .get.length === 1) + + spark.sql(s"use $catalog2").collect() + spark.sql(s"insert into $dbTableShortName partition(city='ct2') " + + s"values(21),(22),(23),(24),(25)").collect() + spark.sql(s"select * from $cat2Table where city='ct2'").collect() + assert(HiveFileStatusCache.getOrCreate(spark, cat2Table) + .getLeafFiles(new Path(s"$location/city=ct1")).isEmpty) + assert(HiveFileStatusCache.getOrCreate(spark, cat2Table) + .getLeafFiles(new Path(s"$location/city=ct2")) + .get.length === 1) + } + } +} \ No newline at end of file diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala index 68d01f1cc78..fa9a05e2d13 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveTest.scala @@ -66,6 +66,7 @@ abstract class KyuubiHiveTest extends QueryTest with Logging { .set("spark.ui.enabled", "false") .set("spark.sql.catalogImplementation", "hive") .set("spark.sql.catalog.hive", classOf[HiveTableCatalog].getName) + .set("spark.sql.catalog.hive2", classOf[HiveTableCatalog].getName) .set("javax.jdo.option.ConnectionURL", "jdbc:derby:memory:memorydb;create=true") .set("javax.jdo.option.ConnectionDriverName", "org.apache.derby.jdbc.EmbeddedDriver") From 92cbdd978dbb277354413368b6bbf886ed3ac22c Mon Sep 17 00:00:00 2001 From: tian bao <2011xuesong@gmail.com> Date: Fri, 10 Oct 2025 16:53:39 +0800 Subject: [PATCH 4/9] review change and format code --- .../connector/hive/HiveTableCatalog.scala | 10 ++- .../hive/KyuubiHiveConnectorConf.scala | 12 +++ .../hive/read/HiveFileStatusCache.scala | 23 +++++- .../connector/hive/HiveCatalogSuite.scala | 7 +- .../hive/HiveFileStatusCacheSuite.scala | 77 +++++++++++++------ 5 files changed, 95 insertions(+), 34 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index 847c63dd9f6..89a49f352ed 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -20,9 +20,11 @@ package org.apache.kyuubi.spark.connector.hive import java.lang.{Boolean => JBoolean, Long => JLong} import java.net.URI import java.util + import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Try + import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf import org.apache.spark.internal.Logging @@ -33,7 +35,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.util.quoteIfNeeded -import org.apache.spark.sql.connector.catalog.{Identifier, NamespaceChange, SupportsNamespaces, Table, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.NamespaceChange.RemoveProperty import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.execution.command.DDLUtils @@ -43,8 +45,9 @@ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} import org.apache.spark.sql.internal.StaticSQLConf.{CATALOG_IMPLEMENTATION, GLOBAL_TEMP_DATABASE} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap + import org.apache.kyuubi.spark.connector.hive.HiveConnectorUtils.withSparkSQLConf -import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper, getStorageFormatAndProvider, toCatalogDatabase} +import org.apache.kyuubi.spark.connector.hive.HiveTableCatalog.{getStorageFormatAndProvider, toCatalogDatabase, CatalogDatabaseHelper, IdentifierHelper, NamespaceHelper} import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorDelegationTokenProvider.metastoreTokenSignature import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors} @@ -423,7 +426,8 @@ class HiveTableCatalog(sparkSession: SparkSession) } override def invalidateTable(ident: Identifier): Unit = { - val qualifiedName = s"$catalogName.${ident.namespace().mkString(".")}.${ident.name()}" + super.invalidateTable(ident) + val qualifiedName = s"$catalogName.$ident" HiveFileStatusCache.getOrCreate(sparkSession, qualifiedName).invalidateAll() } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala index 98968a7d41c..e3aff5e5e8a 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala @@ -57,4 +57,16 @@ object KyuubiHiveConnectorConf { .version("1.11.0") .booleanConf .createWithDefault(true) + + val HIVE_FILE_STATUS_CACHE_SCOPE = + buildConf("spark.sql.kyuubi.hive.file.status.cache.scope") + .doc("The scope of hive file status cache, global, session and none.") + .version("1.11.0") + .stringConf + .transform(policy => policy.toUpperCase(Locale.ROOT)) + .checkValue( + policy => Set("SESSION", "NONE").contains(policy), + "Invalid value for 'spark.sql.kyuubi.hive.file.status.cache.scope'." + + "Valid values are 'SESSION', 'NONE'.") + .createWithDefault("SESSION") } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala index 0ef2743a455..6a86e0d7224 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala @@ -29,8 +29,20 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} import org.apache.spark.util.SizeEstimator +import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.HIVE_FILE_STATUS_CACHE_SCOPE + /** - * Use [[HiveFileStatusCache.getOrCreate()]] to construct a globally shared file status cache. + * Forked from Apache Spark's [[org.apache.spark.sql.execution.datasources.FileStatusCache]] 3.5.5. + * + * Because the original FileStatusCache cannot take effect (see https://github.com/apache/kyuubi + * /issues/7192). + * + * The main modification point is that at the session level, the cache key is the qualified name + * of the table (in the form of `catalog.database.table`) + path. The previous key was an + * object + path generated during initialization, and the current scenario is that FileStatusCache + * is not preserved by the outside, resulting in different keys and ineffective caching. + * + * Use [[HiveFileStatusCache.getOrCreate()]] to construct a session/none shared file status cache. */ object HiveFileStatusCache { private var sharedCache: HiveSharedInMemoryCache = _ @@ -41,14 +53,17 @@ object HiveFileStatusCache { */ def getOrCreate(session: SparkSession, qualifiedName: String): FileStatusCache = synchronized { - if (session.sessionState.conf.manageFilesourcePartitions && - session.sessionState.conf.filesourcePartitionFileCacheSize > 0) { + val conf = session.sessionState.conf + if (conf.manageFilesourcePartitions && conf.filesourcePartitionFileCacheSize > 0) { if (sharedCache == null) { sharedCache = new HiveSharedInMemoryCache( session.sessionState.conf.filesourcePartitionFileCacheSize, session.sessionState.conf.metadataCacheTTL) } - sharedCache.createForNewClient(qualifiedName) + conf.getConf(HIVE_FILE_STATUS_CACHE_SCOPE) match { + case "SESSION" => sharedCache.createForNewClient(qualifiedName) + case "NONE" => NoopCache + } } else { NoopCache } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala index e4843619677..e4e41f5f562 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveCatalogSuite.scala @@ -286,7 +286,7 @@ class HiveCatalogSuite extends KyuubiHiveTest { test("invalidateTable") { withSparkSession() { spark => val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps) - val qualifiedName = s"$catalogName.${testIdent.namespace().mkString(".")}.${testIdent.name()}" + val qualifiedName = s"$catalogName.$testIdent" val location = table.asInstanceOf[HiveTable].catalogTable.location spark.sql(s"select * from $qualifiedName").collect() @@ -294,13 +294,10 @@ class HiveCatalogSuite extends KyuubiHiveTest { .getLeafFiles(new Path(location)).isDefined) catalog.invalidateTable(testIdent) + // invalidate filestatus cache assert(HiveFileStatusCache.getOrCreate(spark, qualifiedName) .getLeafFiles(new Path(location)).isEmpty) - spark.sql(s"select * from $qualifiedName").collect() - assert(HiveFileStatusCache.getOrCreate(spark, qualifiedName) - .getLeafFiles(new Path(location)).isDefined) - val loaded = catalog.loadTable(testIdent) assert(table.name == loaded.name) assert(table.schema == loaded.schema) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala index 31b9cea45c7..532095087e1 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala @@ -17,18 +17,48 @@ package org.apache.kyuubi.spark.connector.hive -import com.google.common.collect.Maps import scala.concurrent.duration.DurationInt + +import com.google.common.collect.Maps import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} +import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.scalatest.concurrent.Eventually.eventually import org.scalatest.concurrent.Futures.timeout + +import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.HIVE_FILE_STATUS_CACHE_SCOPE import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache -import org.apache.spark.sql.connector.catalog.Identifier -import org.apache.spark.sql.util.CaseInsensitiveStringMap class HiveFileStatusCacheSuite extends KyuubiHiveTest { + test("use different cache scope") { + Seq("SESSION", "NONE").foreach { value => + withSparkSession(Map(HIVE_FILE_STATUS_CACHE_SCOPE.key -> value)) { _ => + val path = new Path("/dummy_tmp", "abc") + val files = (1 to 3).map(_ => new FileStatus()) + + HiveFileStatusCache.resetForTesting() + val fileStatusCacheTabel = HiveFileStatusCache.getOrCreate(spark, "catalog.db.catTable") + fileStatusCacheTabel.putLeafFiles(path, files.toArray) + + value match { + // Exactly 3 files are cached. + case "SESSION" => + assert(fileStatusCacheTabel.getLeafFiles(path).get.length === 3) + case "NONE" => + assert(fileStatusCacheTabel.getLeafFiles(path).isEmpty) + case _ => + throw new IllegalArgumentException( + s"Unexpected value: '$value'. Only 'SESSION' or 'NONE' are allowed.") + } + + fileStatusCacheTabel.invalidateAll() + assert(fileStatusCacheTabel.getLeafFiles(path).isEmpty) + } + } + } + test("cached by qualifiedName") { val previousValue = SQLConf.get.getConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS) try { @@ -47,13 +77,13 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { // Exactly 3 files are cached. assert(fileStatusCacheTabel1.getLeafFiles(path).get.length === 3) assert(fileStatusCacheTabel2.getLeafFiles(path).get.length === 3) - assert(fileStatusCacheTabel3.getLeafFiles(path).isEmpty === true) + assert(fileStatusCacheTabel3.getLeafFiles(path).isEmpty) // Wait until the cache expiration. eventually(timeout(3.seconds)) { // And the cache is gone. - assert(fileStatusCacheTabel1.getLeafFiles(path).isEmpty === true) - assert(fileStatusCacheTabel2.getLeafFiles(path).isEmpty === true) - assert(fileStatusCacheTabel3.getLeafFiles(path).isEmpty === true) + assert(fileStatusCacheTabel1.getLeafFiles(path).isEmpty) + assert(fileStatusCacheTabel2.getLeafFiles(path).isEmpty) + assert(fileStatusCacheTabel3.getLeafFiles(path).isEmpty) } } finally { SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) @@ -78,7 +108,7 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { // Wait until the cache expiration. eventually(timeout(3.seconds)) { // And the cache is gone. - assert(fileStatusCache.getLeafFiles(path).isEmpty === true) + assert(fileStatusCache.getLeafFiles(path).isEmpty) } } finally { SQLConf.get.setConf(StaticSQLConf.METADATA_CACHE_TTL_SECONDS, previousValue) @@ -112,15 +142,16 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { assert(spark.sql(s"select * from $table").count() === 5) assert(HiveFileStatusCache.getOrCreate(spark, table) - .getLeafFiles(new Path(s"$location/city=ct")).get.length === 1) + .getLeafFiles(new Path(s"$location/city=ct")).isDefined) + // should clear cache spark.sql(s"insert into $table partition(city='ct') values(11),(21),(31),(41),(51)").collect() assert(HiveFileStatusCache.getOrCreate(spark, table) .getLeafFiles(new Path(s"$location/city=ct")).isEmpty) assert(spark.sql(s"select * from $table").count() === 10) assert(HiveFileStatusCache.getOrCreate(spark, table) - .getLeafFiles(new Path(s"$location/city=ct")).get.length === 2) + .getLeafFiles(new Path(s"$location/city=ct")).isDefined) } } @@ -142,8 +173,9 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { assert(spark.sql(s"select * from $table").count() === 5) assert(HiveFileStatusCache.getOrCreate(spark, table) - .getLeafFiles(new Path(s"$location/city=ct")).get.length === 1) + .getLeafFiles(new Path(s"$location/city=ct")).isDefined) + // should clear cache spark.sql(s"insert overwrite $table partition(city='ct') values(11),(21),(31),(41),(51)") .collect() assert(HiveFileStatusCache.getOrCreate(spark, table) @@ -151,7 +183,7 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { assert(spark.sql(s"select * from $table").count() === 5) assert(HiveFileStatusCache.getOrCreate(spark, table) - .getLeafFiles(new Path(s"$location/city=ct")).get.length === 1) + .getLeafFiles(new Path(s"$location/city=ct")).isDefined) } } @@ -170,8 +202,9 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { spark.sql(s"insert into $table partition(city='ct') values(10),(20),(30),(40),(50)").collect() spark.sql(s"select * from $table").collect() assert(HiveFileStatusCache.getOrCreate(spark, table) - .getLeafFiles(new Path(s"$location/city=ct")).get.length === 1) + .getLeafFiles(new Path(s"$location/city=ct")).isDefined) + // should clear cache spark.sql(s"ALTER TABLE $table ADD COLUMNS (name string)").collect() assert(HiveFileStatusCache.getOrCreate(spark, table) .getLeafFiles(new Path(s"$location/city=ct")).isEmpty) @@ -182,8 +215,8 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { val dbName = "default" val oldTbName = "tbl_partition" val newTbName = "tbl_partition_new" - val oldTable = s"${catalogName}.${dbName}.${oldTbName}" - val newTable = s"${catalogName}.${dbName}.${newTbName}" + val oldTable = s"$catalogName.$dbName.$oldTbName" + val newTable = s"$catalogName.$dbName.$newTbName" withTable(newTable) { spark.sql(s"create table ${oldTable} (age int)partitioned by(city string) stored as orc") @@ -197,7 +230,7 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { .asInstanceOf[HiveTable] .catalogTable.location.toString assert(HiveFileStatusCache.getOrCreate(spark, oldTable) - .getLeafFiles(new Path(s"$oldLocation/city=ct")).get.length === 1) + .getLeafFiles(new Path(s"$oldLocation/city=ct")).isDefined) spark.sql(s"DROP TABLE IF EXISTS ${newTable}").collect() spark.sql(s"use ${catalogName}.${dbName}").collect() @@ -228,7 +261,7 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { withTable(cat1Table, cat2Table) { spark.sql(s"CREATE TABLE IF NOT EXISTS $cat1Table (age int)partitioned by(city string)" + - s" stored as orc").collect() + s" stored as orc").collect() val location = newCatalog() .loadTable(Identifier.of(Array(dbName), tbName)) .asInstanceOf[HiveTable] @@ -236,21 +269,21 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { spark.sql(s"use $catalog1").collect() spark.sql(s"insert into $dbTableShortName partition(city='ct1') " + - s"values(11),(12),(13),(14),(15)").collect() + s"values(11),(12),(13),(14),(15)").collect() spark.sql(s"select * from $cat1Table where city='ct1'").collect() assert(HiveFileStatusCache.getOrCreate(spark, cat1Table) .getLeafFiles(new Path(s"$location/city=ct1")) - .get.length === 1) + .isDefined) spark.sql(s"use $catalog2").collect() spark.sql(s"insert into $dbTableShortName partition(city='ct2') " + - s"values(21),(22),(23),(24),(25)").collect() + s"values(21),(22),(23),(24),(25)").collect() spark.sql(s"select * from $cat2Table where city='ct2'").collect() assert(HiveFileStatusCache.getOrCreate(spark, cat2Table) .getLeafFiles(new Path(s"$location/city=ct1")).isEmpty) assert(HiveFileStatusCache.getOrCreate(spark, cat2Table) .getLeafFiles(new Path(s"$location/city=ct2")) - .get.length === 1) + .isDefined) } } -} \ No newline at end of file +} From c43d9c5e1348d856c72b56af1686d516c065e616 Mon Sep 17 00:00:00 2001 From: tian bao <2011xuesong@gmail.com> Date: Fri, 10 Oct 2025 17:12:17 +0800 Subject: [PATCH 5/9] format code --- .../kyuubi/spark/connector/hive/write/HiveBatchWrite.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala index 9b35556cffb..53b3848bd0d 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/write/HiveBatchWrite.scala @@ -18,6 +18,7 @@ package org.apache.kyuubi.spark.connector.hive.write import scala.util.control.NonFatal + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf @@ -26,13 +27,14 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, PhysicalWriteInfo, WriterCommitMessage} import org.apache.spark.sql.execution.datasources.{WriteJobDescription, WriteTaskResult} import org.apache.spark.sql.execution.datasources.v2.FileBatchWrite import org.apache.spark.sql.hive.kyuubi.connector.HiveBridgeHelper.toSQLValue import org.apache.spark.sql.types.StringType + import org.apache.kyuubi.spark.connector.hive.{HiveConnectorUtils, HiveTableCatalog, KyuubiHiveConnectorException} -import org.apache.spark.sql.connector.catalog.Identifier class HiveBatchWrite( sparkSession: SparkSession, @@ -69,8 +71,7 @@ class HiveBatchWrite( // un-cache this table. hiveTableCatalog.catalog.invalidateCachedTable(table.identifier) hiveTableCatalog.invalidateTable( - Identifier.of(Array(table.identifier.database.getOrElse("")), table.identifier.table) - ) + Identifier.of(Array(table.identifier.database.getOrElse("")), table.identifier.table)) val catalog = hiveTableCatalog.catalog if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) { From 2488479b0ac34ce707582496cfdccb6484ee7029 Mon Sep 17 00:00:00 2001 From: tian bao <2011xuesong@gmail.com> Date: Fri, 10 Oct 2025 17:39:10 +0800 Subject: [PATCH 6/9] fix code style --- .../kyuubi/spark/connector/hive/read/HiveFileIndex.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala index 66074f0e5bb..a82d61a0b11 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileIndex.scala @@ -48,7 +48,8 @@ class HiveCatalogFileIndex( private val partPathToBindHivePart: mutable.Map[PartitionPath, CatalogTablePartition] = mutable.Map() - private val fileStatusCache = HiveFileStatusCache.getOrCreate(sparkSession, + private val fileStatusCache = HiveFileStatusCache.getOrCreate( + sparkSession, hiveCatalog.name() + "." + catalogTable.qualifiedName) private val baseLocation: Option[URI] = table.storage.locationUri From ad428cb7de7833bd4207785ff5154cd587c05933 Mon Sep 17 00:00:00 2001 From: tian bao <2011xuesong@gmail.com> Date: Sat, 11 Oct 2025 11:47:24 +0800 Subject: [PATCH 7/9] fix failed tests --- .../hive/HiveFileStatusCacheSuite.scala | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala index 532095087e1..2c04c70c8dc 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala @@ -255,35 +255,35 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { val catalog2 = "hive2" val dbName = "default" val tbName = "tbl_partition" - val dbTableShortName = s"${dbName}.${tbName}" - val cat1Table = s"${catalog1}.${dbTableShortName}" - val cat2Table = s"${catalog2}.${dbTableShortName}" + val cat1Table = s"${catalog1}.${dbName}.${tbName}" + val cat2Table = s"${catalog2}.${dbName}.${tbName}" withTable(cat1Table, cat2Table) { spark.sql(s"CREATE TABLE IF NOT EXISTS $cat1Table (age int)partitioned by(city string)" + s" stored as orc").collect() + spark.sql(s"CREATE TABLE IF NOT EXISTS $cat2Table (age int)partitioned by(city string)" + + s" stored as orc").collect() + val location = newCatalog() .loadTable(Identifier.of(Array(dbName), tbName)) .asInstanceOf[HiveTable] .catalogTable.location.toString - spark.sql(s"use $catalog1").collect() - spark.sql(s"insert into $dbTableShortName partition(city='ct1') " + + spark.sql(s"insert into $cat1Table partition(city='ct1') " + s"values(11),(12),(13),(14),(15)").collect() spark.sql(s"select * from $cat1Table where city='ct1'").collect() assert(HiveFileStatusCache.getOrCreate(spark, cat1Table) - .getLeafFiles(new Path(s"$location/city=ct1")) - .isDefined) + .getLeafFiles(new Path(s"$location/city=ct1")).isDefined) + assert(HiveFileStatusCache.getOrCreate(spark, cat2Table) + .getLeafFiles(new Path(s"$location/city=ct1")).isEmpty) - spark.sql(s"use $catalog2").collect() - spark.sql(s"insert into $dbTableShortName partition(city='ct2') " + + spark.sql(s"insert into $cat2Table partition(city='ct2') " + s"values(21),(22),(23),(24),(25)").collect() spark.sql(s"select * from $cat2Table where city='ct2'").collect() + assert(HiveFileStatusCache.getOrCreate(spark, cat1Table) + .getLeafFiles(new Path(s"$location/city=ct2")).isEmpty) assert(HiveFileStatusCache.getOrCreate(spark, cat2Table) - .getLeafFiles(new Path(s"$location/city=ct1")).isEmpty) - assert(HiveFileStatusCache.getOrCreate(spark, cat2Table) - .getLeafFiles(new Path(s"$location/city=ct2")) - .isDefined) + .getLeafFiles(new Path(s"$location/city=ct2")).isDefined) } } } From d4f4d14d8f30b3375c273d8b2be62a00a7c66a26 Mon Sep 17 00:00:00 2001 From: tian bao <2011xuesong@gmail.com> Date: Mon, 13 Oct 2025 10:15:10 +0800 Subject: [PATCH 8/9] fix failed ut --- .../spark/connector/hive/KyuubiHiveConnectorConf.scala | 8 ++++---- .../spark/connector/hive/read/HiveFileStatusCache.scala | 6 +++--- .../spark/connector/hive/HiveFileStatusCacheSuite.scala | 6 +++--- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala index e3aff5e5e8a..f50229181ff 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/KyuubiHiveConnectorConf.scala @@ -60,13 +60,13 @@ object KyuubiHiveConnectorConf { val HIVE_FILE_STATUS_CACHE_SCOPE = buildConf("spark.sql.kyuubi.hive.file.status.cache.scope") - .doc("The scope of hive file status cache, global, session and none.") + .doc("The scope of hive file status cache, globe and none.") .version("1.11.0") .stringConf .transform(policy => policy.toUpperCase(Locale.ROOT)) .checkValue( - policy => Set("SESSION", "NONE").contains(policy), + policy => Set("GLOBE", "NONE").contains(policy), "Invalid value for 'spark.sql.kyuubi.hive.file.status.cache.scope'." + - "Valid values are 'SESSION', 'NONE'.") - .createWithDefault("SESSION") + "Valid values are 'GLOBE', 'NONE'.") + .createWithDefault("GLOBE") } diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala index 6a86e0d7224..6cfef9e4c37 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/read/HiveFileStatusCache.scala @@ -37,12 +37,12 @@ import org.apache.kyuubi.spark.connector.hive.KyuubiHiveConnectorConf.HIVE_FILE_ * Because the original FileStatusCache cannot take effect (see https://github.com/apache/kyuubi * /issues/7192). * - * The main modification point is that at the session level, the cache key is the qualified name + * The main modification point is that at the globally level, the cache key is the qualified name * of the table (in the form of `catalog.database.table`) + path. The previous key was an * object + path generated during initialization, and the current scenario is that FileStatusCache * is not preserved by the outside, resulting in different keys and ineffective caching. * - * Use [[HiveFileStatusCache.getOrCreate()]] to construct a session/none shared file status cache. + * Use [[HiveFileStatusCache.getOrCreate()]] to construct a globe/none shared file status cache. */ object HiveFileStatusCache { private var sharedCache: HiveSharedInMemoryCache = _ @@ -61,7 +61,7 @@ object HiveFileStatusCache { session.sessionState.conf.metadataCacheTTL) } conf.getConf(HIVE_FILE_STATUS_CACHE_SCOPE) match { - case "SESSION" => sharedCache.createForNewClient(qualifiedName) + case "GLOBE" => sharedCache.createForNewClient(qualifiedName) case "NONE" => NoopCache } } else { diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala index 2c04c70c8dc..f553ccaafe7 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/test/scala/org/apache/kyuubi/spark/connector/hive/HiveFileStatusCacheSuite.scala @@ -33,7 +33,7 @@ import org.apache.kyuubi.spark.connector.hive.read.HiveFileStatusCache class HiveFileStatusCacheSuite extends KyuubiHiveTest { test("use different cache scope") { - Seq("SESSION", "NONE").foreach { value => + Seq("GLOBE", "NONE").foreach { value => withSparkSession(Map(HIVE_FILE_STATUS_CACHE_SCOPE.key -> value)) { _ => val path = new Path("/dummy_tmp", "abc") val files = (1 to 3).map(_ => new FileStatus()) @@ -44,13 +44,13 @@ class HiveFileStatusCacheSuite extends KyuubiHiveTest { value match { // Exactly 3 files are cached. - case "SESSION" => + case "GLOBE" => assert(fileStatusCacheTabel.getLeafFiles(path).get.length === 3) case "NONE" => assert(fileStatusCacheTabel.getLeafFiles(path).isEmpty) case _ => throw new IllegalArgumentException( - s"Unexpected value: '$value'. Only 'SESSION' or 'NONE' are allowed.") + s"Unexpected value: '$value'. Only 'GLOBE' or 'NONE' are allowed.") } fileStatusCacheTabel.invalidateAll() From b5aaec0176aed9fcab33b6dd0d1b14c5531b954f Mon Sep 17 00:00:00 2001 From: tian bao <2011xuesong@gmail.com> Date: Mon, 13 Oct 2025 23:20:50 +0800 Subject: [PATCH 9/9] fix altertable catalogTable copy not support spark v4.0 --- .../connector/hive/HiveTableCatalog.scala | 28 ++++++++++++++----- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala index 89a49f352ed..5dbc89533df 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveTableCatalog.scala @@ -378,13 +378,27 @@ class HiveTableCatalog(sparkSession: SparkSession) } try { - catalog.alterTable( - catalogTable.copy( - properties = properties, - schema = schema, - owner = owner, - comment = comment, - storage = storage)) + catalog.alterTable(newCatalogTable( + identifier = catalogTable.identifier, + tableType = catalogTable.tableType, + storage = storage, + schema = schema, + provider = catalogTable.provider, + partitionColumnNames = catalogTable.partitionColumnNames, + bucketSpec = catalogTable.bucketSpec, + owner = owner, + createTime = catalogTable.createTime, + lastAccessTime = catalogTable.lastAccessTime, + createVersion = catalogTable.createVersion, + properties = properties, + stats = catalogTable.stats, + viewText = catalogTable.viewText, + comment = comment, + unsupportedFeatures = catalogTable.unsupportedFeatures, + tracksPartitionsInCatalog = catalogTable.tracksPartitionsInCatalog, + schemaPreservesCase = catalogTable.schemaPreservesCase, + ignoredProperties = catalogTable.ignoredProperties, + viewOriginalText = catalogTable.viewOriginalText)) } catch { case _: NoSuchTableException => throw new NoSuchTableException(ident)