From 524c202a9c946166af259c746231dbc31d64c632 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 15 May 2025 21:35:51 +0800 Subject: [PATCH 1/2] Fix thrift GetSchemas GetTables op when has custom SessionCatalog --- .../engine/spark/repl/DataFrameHolder.scala | 4 +- .../engine/spark/util/SparkCatalogUtils.scala | 53 ++++++++++--------- .../org/apache/kyuubi/DeltaSuiteMixin.scala | 10 ++-- .../org/apache/kyuubi/IcebergSuiteMixin.scala | 10 ++-- .../operation/IcebergMetadataTests.scala | 20 +++---- 5 files changed, 48 insertions(+), 49 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/DataFrameHolder.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/DataFrameHolder.scala index 17d6ae02eff..6b6fe955b8e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/DataFrameHolder.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/DataFrameHolder.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.engine.spark.repl -import java.util.HashMap +import java.util import org.apache.spark.kyuubi.SparkContextHelper import org.apache.spark.sql.{DataFrame, SparkSession} @@ -29,7 +29,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} */ class DataFrameHolder(spark: SparkSession) { - private val results = new HashMap[String, DataFrame]() + private val results = new util.HashMap[String, DataFrame]() private def currentId: String = { SparkContextHelper.getCurrentStatementId(spark.sparkContext) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala index b9a5028acdc..d4fdca91912 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/util/SparkCatalogUtils.scala @@ -41,11 +41,11 @@ object SparkCatalogUtils extends Logging { val sparkTableTypes: Set[String] = Set(VIEW, TABLE) // /////////////////////////////////////////////////////////////////////////////////////////////// - // Catalog // + // Catalog // // /////////////////////////////////////////////////////////////////////////////////////////////// /** - * Get all register catalogs in Spark's `CatalogManager` + * Note that the result only contains loaded catalogs because catalogs are lazily loaded in Spark. */ def getCatalogs(spark: SparkSession): Seq[Row] = { @@ -70,7 +70,7 @@ object SparkCatalogUtils extends Logging { } def setCurrentCatalog(spark: SparkSession, catalog: String): Unit = { - // SPARK-36841(3.3.0) Ensure setCurrentCatalog method catalog must exist + // SPARK-36841 (3.3.0) Ensure setCurrentCatalog method catalog must exist if (spark.sessionState.catalogManager.isCatalogRegistered(catalog)) { spark.sessionState.catalogManager.setCurrentCatalog(catalog) } else { @@ -78,21 +78,26 @@ object SparkCatalogUtils extends Logging { } } + // SPARK-50700 (4.0.0) adds the `builtin` magic value + private def hasCustomSessionCatalog(spark: SparkSession): Boolean = { + spark.conf.get(s"spark.sql.catalog.$SESSION_CATALOG", "builtin") != "builtin" + } + // /////////////////////////////////////////////////////////////////////////////////////////////// - // Schema // + // Schema // // /////////////////////////////////////////////////////////////////////////////////////////////// /** - * a list of [[Row]]s, with 2 fields `schemaName: String, catalogName: String` + * Return a list of [[Row]]s, with 2 fields `schemaName: String, catalogName: String` */ def getSchemas( spark: SparkSession, catalogName: String, schemaPattern: String): Seq[Row] = { - if (catalogName == SparkCatalogUtils.SESSION_CATALOG) { - (spark.sessionState.catalog.listDatabases(schemaPattern) ++ - getGlobalTempViewManager(spark, schemaPattern)) - .map(Row(_, SparkCatalogUtils.SESSION_CATALOG)) + if (catalogName == SESSION_CATALOG && !hasCustomSessionCatalog(spark)) { + val dbs = spark.sessionState.catalog.listDatabases(schemaPattern) + val globalTempDb = getGlobalTempViewManager(spark, schemaPattern) + (dbs ++ globalTempDb).map(Row(_, SESSION_CATALOG)) } else { val catalog = getCatalog(spark, catalogName) getSchemasWithPattern(catalog, schemaPattern).map(Row(_, catalog.name)) @@ -162,6 +167,21 @@ object SparkCatalogUtils extends Logging { val catalog = getCatalog(spark, catalogName) val namespaces = listNamespacesWithPattern(catalog, schemaPattern) catalog match { + case tc: TableCatalog => + val tp = tablePattern.r.pattern + val identifiers = namespaces.flatMap { ns => + tc.listTables(ns).filter(i => tp.matcher(quoteIfNeeded(i.name())).matches()) + } + identifiers.map { ident => + // TODO: restore view type for session catalog + val comment = if (ignoreTableProperties) "" + else { // load table is a time consuming operation + tc.loadTable(ident).properties().getOrDefault(TableCatalog.PROP_COMMENT, "") + } + val schema = ident.namespace().map(quoteIfNeeded).mkString(".") + val tableName = quoteIfNeeded(ident.name()) + Row(catalog.name(), schema, tableName, TABLE, comment, null, null, null, null, null) + } case builtin if builtin.name() == SESSION_CATALOG => val sessionCatalog = spark.sessionState.catalog val databases = sessionCatalog.listDatabases(schemaPattern) @@ -206,21 +226,6 @@ object SparkCatalogUtils extends Logging { } } } - case tc: TableCatalog => - val tp = tablePattern.r.pattern - val identifiers = namespaces.flatMap { ns => - tc.listTables(ns).filter(i => tp.matcher(quoteIfNeeded(i.name())).matches()) - } - identifiers.map { ident => - // TODO: restore view type for session catalog - val comment = if (ignoreTableProperties) "" - else { // load table is a time consuming operation - tc.loadTable(ident).properties().getOrDefault(TableCatalog.PROP_COMMENT, "") - } - val schema = ident.namespace().map(quoteIfNeeded).mkString(".") - val tableName = quoteIfNeeded(ident.name()) - Row(catalog.name(), schema, tableName, TABLE, comment, null, null, null, null, null) - } case _ => Seq.empty[Row] } } diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/DeltaSuiteMixin.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/DeltaSuiteMixin.scala index 2ebad55f5f8..34b0aa419d3 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/DeltaSuiteMixin.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/DeltaSuiteMixin.scala @@ -21,19 +21,19 @@ import java.nio.file.Path trait DeltaSuiteMixin extends DataLakeSuiteMixin { - override protected def format: String = "delta" + override protected val format: String = "delta" - override protected def catalog: String = "spark_catalog" + override protected val catalog: String = "spark_catalog" - override protected def warehouse: Path = Utils.createTempDir() + override protected val warehouse: Path = Utils.createTempDir() - override protected def extraJars: String = { + override protected val extraJars: String = { System.getProperty("java.class.path") .split(":") .filter(_.contains("io/delta/delta")).mkString(",") } - override protected def extraConfigs = Map( + override protected val extraConfigs: Map[String, String] = Map( "spark.sql.catalogImplementation" -> "in-memory", "spark.sql.defaultCatalog" -> catalog, "spark.sql.extensions" -> "io.delta.sql.DeltaSparkSessionExtension", diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/IcebergSuiteMixin.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/IcebergSuiteMixin.scala index a90f834efa5..8f0b7d796ea 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/IcebergSuiteMixin.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/IcebergSuiteMixin.scala @@ -21,19 +21,19 @@ import java.nio.file.Path trait IcebergSuiteMixin extends DataLakeSuiteMixin { - override protected def format: String = "iceberg" + override protected val format: String = "iceberg" - override protected def catalog: String = "hadoop_prod" + override protected val catalog: String = "hadoop_prod" - override protected def warehouse: Path = Utils.createTempDir() + override protected val warehouse: Path = Utils.createTempDir() - override protected def extraJars: String = { + override protected val extraJars: String = { System.getProperty("java.class.path") .split(":") .filter(_.contains("iceberg-spark")).head } - override protected def extraConfigs = Map( + override protected val extraConfigs: Map[String, String] = Map( "spark.sql.catalogImplementation" -> "in-memory", "spark.sql.defaultCatalog" -> catalog, "spark.sql.extensions" -> "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala index 814c08343d0..6f9bd9d8aa4 100644 --- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala +++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/IcebergMetadataTests.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.operation import scala.collection.mutable.ListBuffer -import org.apache.kyuubi.{IcebergSuiteMixin, SPARK_COMPILE_VERSION} +import org.apache.kyuubi.IcebergSuiteMixin import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._ import org.apache.kyuubi.util.AssertionUtils._ import org.apache.kyuubi.util.SparkVersionUtil @@ -51,12 +51,9 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit checkGetSchemas(metaData.getSchemas("spark_catalog", pattern), dbDflts, "spark_catalog") } - Seq(null, catalog).foreach { cg => + Seq("spark_catalog", catalog).foreach { cg => matchAllPatterns foreach { pattern => - checkGetSchemas( - metaData.getSchemas(cg, pattern), - dbs, - catalog) + checkGetSchemas(metaData.getSchemas(cg, pattern), dbs, catalog) } } @@ -87,7 +84,7 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db")) val metaData = statement.getConnection.getMetaData - Seq(null, catalog).foreach { cg => + Seq("spark_catalog", catalog).foreach { cg => matchAllPatterns foreach { pattern => checkGetSchemas( metaData.getSchemas(cg, pattern), @@ -116,7 +113,7 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit dbs.foreach(db => statement.execute(s"CREATE NAMESPACE IF NOT EXISTS $db")) val metaData = statement.getConnection.getMetaData - Seq(catalog).foreach { cg => + Seq("spark_catalog", catalog).foreach { cg => dbs.foreach { db => try { statement.execute( @@ -156,12 +153,9 @@ trait IcebergMetadataTests extends HiveJDBCTestHelper with IcebergSuiteMixin wit "map", "date", "timestamp", - // SPARK-37931 - if (SPARK_ENGINE_RUNTIME_VERSION >= "3.3") "struct" - else "struct<`X`: bigint, `Y`: double>", + "struct", "binary", - // SPARK-37931 - if (SPARK_COMPILE_VERSION >= "3.3") "struct" else "struct<`X`: string>") + "struct") val cols = dataTypes.zipWithIndex.map { case (dt, idx) => s"c$idx" -> dt } val (colNames, _) = cols.unzip From ce88df2cac8886fa631b942c6413cfb08aece6ad Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Thu, 15 May 2025 21:36:31 +0800 Subject: [PATCH 2/2] nit --- .../org/apache/kyuubi/engine/spark/repl/DataFrameHolder.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/DataFrameHolder.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/DataFrameHolder.scala index 6b6fe955b8e..17d6ae02eff 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/DataFrameHolder.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/repl/DataFrameHolder.scala @@ -17,7 +17,7 @@ package org.apache.kyuubi.engine.spark.repl -import java.util +import java.util.HashMap import org.apache.spark.kyuubi.SparkContextHelper import org.apache.spark.sql.{DataFrame, SparkSession} @@ -29,7 +29,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession} */ class DataFrameHolder(spark: SparkSession) { - private val results = new util.HashMap[String, DataFrame]() + private val results = new HashMap[String, DataFrame]() private def currentId: String = { SparkContextHelper.getCurrentStatementId(spark.sparkContext)