Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import org.apache.paimon.spark.catalog.functions.V1FunctionConverter;
import org.apache.paimon.spark.utils.CatalogUtils;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.iceberg.IcebergTable;
import org.apache.paimon.table.lance.LanceTable;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.types.BlobType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -649,13 +652,19 @@ protected org.apache.spark.sql.connector.catalog.Table loadSparkTable(
Identifier ident, Map<String, String> extraOptions) throws NoSuchTableException {
try {
org.apache.paimon.catalog.Identifier tblIdent = toIdentifier(ident, catalogName);
org.apache.paimon.table.Table paimonTable =
org.apache.paimon.table.Table table =
copyWithSQLConf(
catalog.getTable(tblIdent), catalogName, tblIdent, extraOptions);
if (paimonTable instanceof FormatTable) {
return toSparkFormatTable(ident, (FormatTable) paimonTable);
if (table instanceof FormatTable) {
return toSparkFormatTable(ident, (FormatTable) table);
} else if (table instanceof IcebergTable) {
return new SparkIcebergTable(table);
} else if (table instanceof LanceTable) {
return new SparkLanceTable(table);
} else if (table instanceof ObjectTable) {
return new SparkObjectTable(table);
} else {
return new SparkTable(paimonTable);
return new SparkTable(table);
}
} catch (Catalog.TableNotExistException e) {
throw new NoSuchTableException(ident);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ package org.apache.paimon.spark
import org.apache.paimon.table.Table
import org.apache.paimon.utils.StringUtils

import org.apache.spark.sql.connector.catalog.TableCapability
import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
import org.apache.spark.sql.types.StructType

import java.util.{Map => JMap}
import java.util.{Collections => JCollections, Map => JMap, Set => JSet}

import scala.collection.JavaConverters._

Expand All @@ -34,6 +35,8 @@ abstract class BaseTable

val table: Table

override def capabilities(): JSet[TableCapability] = JCollections.emptySet[TableCapability]()

override def name: String = table.fullName

override lazy val schema: StructType = SparkTypeUtils.fromPaimonRowType(table.rowType)
Expand All @@ -42,9 +45,7 @@ abstract class BaseTable
table.partitionKeys().asScala.map(p => Expressions.identity(StringUtils.quote(p))).toArray
}

override def properties: JMap[String, String] = {
table.options()
}
override def properties: JMap[String, String] = table.options()

override def toString: String = {
s"${table.getClass.getSimpleName}[${table.fullName()}]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,9 @@ import org.apache.paimon.table.Table

/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
case class SparkTable(override val table: Table) extends PaimonSparkTableBase(table) {}

case class SparkIcebergTable(table: Table) extends BaseTable

case class SparkLanceTable(table: Table) extends BaseTable

case class SparkObjectTable(table: Table) extends BaseTable