diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala index b9a5028acdc..d4fdca91912 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala @@ -41,11 +41,11 @@ object SparkCatalogUtils extends Logging { val sparkTableTypes: Set[String] = Set(VIEW, TABLE) // /////////////////////////////////////////////////////////////////////////////////////////////// - // Catalog // + // Catalog // // /////////////////////////////////////////////////////////////////////////////////////////////// /** - * Get all register catalogs in Spark's `CatalogManager` + * Note that the result only contains loaded catalogs because catalogs are lazily loaded in Spark. */ def getCatalogs(spark: SparkSession): Seq[Row] = { @@ -70,7 +70,7 @@ object SparkCatalogUtils extends Logging { } def setCurrentCatalog(spark: SparkSession, catalog: String): Unit = { - // SPARK-36841(3.3.0) Ensure setCurrentCatalog method catalog must exist + // SPARK-36841 (3.3.0) Ensure setCurrentCatalog method catalog must exist if (spark.sessionState.catalogManager.isCatalogRegistered(catalog)) { spark.sessionState.catalogManager.setCurrentCatalog(catalog) } else { @@ -78,21 +78,26 @@ object SparkCatalogUtils extends Logging { } } + // SPARK-50700 (4.0.0) adds the `builtin` magic value + private def hasCustomSessionCatalog(spark: SparkSession): Boolean = { + spark.conf.get(s"spark.sql.catalog.$SESSION_CATALOG", "builtin") != "builtin" + } + // /////////////////////////////////////////////////////////////////////////////////////////////// - // Schema // + // Schema // // /////////////////////////////////////////////////////////////////////////////////////////////// /** - * a list of [[Row]]s, with 2 fields `schemaName: String, catalogName: String` + * Return a list of [[Row]]s, with 2 fields `schemaName: String, catalogName: String` */ def getSchemas( spark: SparkSession, catalogName: String, schemaPattern: String): Seq[Row] = { - if (catalogName == SparkCatalogUtils.SESSION_CATALOG) { - (spark.sessionState.catalog.listDatabases(schemaPattern) ++ - getGlobalTempViewManager(spark, schemaPattern)) - .map(Row(_, SparkCatalogUtils.SESSION_CATALOG)) + if (catalogName == SESSION_CATALOG && !hasCustomSessionCatalog(spark)) { + val dbs = spark.sessionState.catalog.listDatabases(schemaPattern) + val globalTempDb = getGlobalTempViewManager(spark, schemaPattern) + (dbs ++ globalTempDb).map(Row(_, SESSION_CATALOG)) } else { val catalog = getCatalog(spark, catalogName) getSchemasWithPattern(catalog, schemaPattern).map(Row(_, catalog.name)) @@ -162,6 +167,21 @@ object SparkCatalogUtils extends Logging { val catalog = getCatalog(spark, catalogName) val namespaces = listNamespacesWithPattern(catalog, schemaPattern) catalog match { + case tc: TableCatalog => + val tp = tablePattern.r.pattern + val identifiers = namespaces.flatMap { ns => + tc.listTables(ns).filter(i => tp.matcher(quoteIfNeeded(i.name())).matches()) + } + identifiers.map { ident => + // TODO: restore view type for session catalog + val comment = if (ignoreTableProperties) "" + else { // load table is a time consuming operation + tc.loadTable(ident).properties().getOrDefault(TableCatalog.PROP_COMMENT, "") + } + val schema = ident.namespace().map(quoteIfNeeded).mkString(".") + val tableName = quoteIfNeeded(ident.name()) + Row(catalog.name(), schema, tableName, TABLE, comment, null, null, null, null, null) + } case builtin if builtin.name() == SESSION_CATALOG => val sessionCatalog = spark.sessionState.catalog val databases = sessionCatalog.listDatabases(schemaPattern) @@ -206,21 +226,6 @@ object SparkCatalogUtils extends Logging { } } } - case tc: TableCatalog => - val tp = tablePattern.r.pattern - val identifiers = namespaces.flatMap { ns => - tc.listTables(ns).filter(i => tp.matcher(quoteIfNeeded(i.name())).matches()) - } - identifiers.map { ident => - // TODO: restore view type for session catalog - val comment = if (ignoreTableProperties) "" - else { // load table is a time consuming operation - tc.loadTable(ident).properties().getOrDefault(TableCatalog.PROP_COMMENT, "") - } - val schema = ident.namespace().map(quoteIfNeeded).mkString(".") - val tableName = quoteIfNeeded(ident.name()) - Row(catalog.name(), schema, tableName, TABLE, comment, null, null, null, null, null) - } case _ => Seq.empty[Row] } } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/DeltaSuiteMixin.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/DeltaSuiteMixin.scala index 2ebad55f5f8..34b0aa419d3 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/DeltaSuiteMixin.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/DeltaSuiteMixin.scala @@ -21,19 +21,19 @@ import java.nio.file.Path trait DeltaSuiteMixin extends DataLakeSuiteMixin { - override protected def format: String = "delta" + override protected val format: String = "delta" - override protected def catalog: String = "spark_catalog" + override protected val catalog: String = "spark_catalog" - override protected def warehouse: Path = Utils.createTempDir() + override protected val warehouse: Path = Utils.createTempDir() - override protected def extraJars: String = { + override protected val extraJars: String = { System.getProperty("java.class.path") .split(":") .filter(_.contains("io/delta/delta")).mkString(",") } - override protected def extraConfigs = Map( + override protected val extraConfigs: Map[String, String] = Map( "spark.sql.catalogImplementation" -> "in-memory", "spark.sql.defaultCatalog" -> catalog, "spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension", diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/IcebergSuiteMixin.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/IcebergSuiteMixin.scala index a90f834efa5..8f0b7d796ea 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/IcebergSuiteMixin.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/IcebergSuiteMixin.scala @@ -21,19 +21,19 @@ import java.nio.file.Path trait IcebergSuiteMixin extends DataLakeSuiteMixin { - override protected def format: String = "iceberg" + override protected val format: String = "iceberg" - override protected def catalog: String = "hadoop_prod" + override protected val catalog: String = "hadoop_prod" - override protected def warehouse: Path = Utils.createTempDir() + override protected val warehouse: Path = Utils.createTempDir() - override protected def extraJars: String = { + override protected val extraJars: String = { System.getProperty("java.class.path") .split(":") .filter(_.contains("iceberg-spark")).head } - override protected def extraConfigs = Map( + override protected val extraConfigs: Map[String, String] = Map( "spark.sql.catalogImplementation" -> "in-memory", "spark.sql.defaultCatalog" -> catalog, "spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala index 814c08343d0..6f9bd9d8aa4 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.operation import scala.collection.mutable.ListBuffer -import org.apache.kyuubi.{IcebergSuiteMixin, SPARK_COMPILE_VERSION} +import org.apache.kyuubi.IcebergSuiteMixin import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ import org.apache.kyuubi.util.AssertionUtils._ import org.apache.kyuubi.util.SparkVersionUtil @@ -51,12 +51,9 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit checkGetSchemas(metaData.getSchemas("spark_catalog", pattern), dbDflts, "spark_catalog") } - Seq(null, catalog).foreach { cg => + Seq("spark_catalog", catalog).foreach { cg => matchAllPatterns foreach { pattern => - checkGetSchemas( - metaData.getSchemas(cg, pattern), - dbs, - catalog) + checkGetSchemas(metaData.getSchemas(cg, pattern), dbs, catalog) } } @@ -87,7 +84,7 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db")) val metaData = statement.getConnection.getMetaData - Seq(null, catalog).foreach { cg => + Seq("spark_catalog", catalog).foreach { cg => matchAllPatterns foreach { pattern => checkGetSchemas( metaData.getSchemas(cg, pattern), @@ -116,7 +113,7 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db")) val metaData = statement.getConnection.getMetaData - Seq(catalog).foreach { cg => + Seq("spark_catalog", catalog).foreach { cg => dbs.foreach { db => try { statement.execute( @@ -156,12 +153,9 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit "map", "date", "timestamp", - // SPARK-37931 - if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") "struct" - else "struct<`X`: bigint, `Y`: double>", + "struct", "binary", - // SPARK-37931 - if (SPARK_COMPILE_VERSION >= "3.3") "struct" else "struct<`X`: string>") + "struct") val cols = dataTypes.zipWithIndex.map { case (dt, idx) => s"c$idx" -> dt } val (colNames, _) = cols.unzip