diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 889adf5a86c4..0698b72f0f4f 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -548,6 +548,12 @@ Boolean Whether to force the use of lookup for compaction. + +
format-table.commit-hive-sync-url
+ (none) + String + Format table commit hive sync uri. +
format-table.file.compression
(none) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 590b7b6dd894..464082cf6e75 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -2030,6 +2030,11 @@ public InlineElement getDescription() { .noDefaultValue() .withFallbackKeys(FILE_COMPRESSION.key()) .withDescription("Format table file compression."); + public static final ConfigOption FORMAT_TABLE_COMMIT_HIVE_SYNC_URI = + ConfigOptions.key("format-table.commit-hive-sync-url") + .stringType() + .noDefaultValue() + .withDescription("Format table commit hive sync uri."); public static final ConfigOption BLOB_FIELD = key("blob-field") @@ -2326,6 +2331,10 @@ public String formatTableFileCompression() { } } + public String formatTableCommitSyncPartitionHiveUri() { + return options.get(FORMAT_TABLE_COMMIT_HIVE_SYNC_URI); + } + public MemorySize fileReaderAsyncThreshold() { return options.get(FILE_READER_ASYNC_THRESHOLD); } diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java index 3517c18f3c85..138769f0c910 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java @@ -239,7 +239,7 @@ public static Table loadTable( Function dataFileIO = metadata.isExternal() ? externalFileIO : internalFileIO; if (options.type() == TableType.FORMAT_TABLE) { - return toFormatTable(identifier, schema, dataFileIO); + return toFormatTable(identifier, schema, dataFileIO, catalogContext); } if (options.type() == TableType.OBJECT_TABLE) { @@ -379,7 +379,10 @@ private static Table createSystemTable(Identifier identifier, Table originTable) } private static FormatTable toFormatTable( - Identifier identifier, TableSchema schema, Function fileIO) { + Identifier identifier, + TableSchema schema, + Function fileIO, + CatalogContext catalogContext) { Map options = schema.options(); FormatTable.Format format = FormatTable.parseFormat( @@ -396,6 +399,7 @@ private static FormatTable toFormatTable( .format(format) .options(options) .comment(schema.comment()) + .catalogContext(catalogContext) .build(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java index 2963dbb04d74..1e481fee452e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java @@ -20,6 +20,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.Public; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.manifest.IndexManifestEntry; @@ -68,6 +69,8 @@ public interface FormatTable extends Table { @Override FormatTable copy(Map dynamicOptions); + CatalogContext catalogContext(); + /** Currently supported formats. */ enum Format { ORC, @@ -105,6 +108,7 @@ class Builder { private Format format; private Map options; @Nullable private String comment; + private CatalogContext catalogContext; public Builder fileIO(FileIO fileIO) { this.fileIO = fileIO; @@ -146,9 +150,22 @@ public Builder comment(@Nullable String comment) { return this; } + public Builder catalogContext(CatalogContext catalogContext) { + this.catalogContext = catalogContext; + return this; + } + public FormatTable build() { return new FormatTableImpl( - fileIO, identifier, rowType, partitionKeys, location, format, options, comment); + fileIO, + identifier, + rowType, + partitionKeys, + location, + format, + options, + comment, + catalogContext); } } @@ -165,6 +182,7 @@ class FormatTableImpl implements FormatTable { private final Format format; private final Map options; @Nullable private final String comment; + private CatalogContext catalogContext; public FormatTableImpl( FileIO fileIO, @@ -174,7 +192,8 @@ public FormatTableImpl( String location, Format format, Map options, - @Nullable String comment) { + @Nullable String comment, + CatalogContext catalogContext) { this.fileIO = fileIO; this.identifier = identifier; this.rowType = rowType; @@ -183,6 +202,7 @@ public FormatTableImpl( this.format = format; this.options = options; this.comment = comment; + this.catalogContext = catalogContext; } @Override @@ -247,7 +267,13 @@ public FormatTable copy(Map dynamicOptions) { location, format, newOptions, - comment); + comment, + catalogContext); + } + + @Override + public CatalogContext catalogContext() { + return this.catalogContext; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java index 78fef13ab676..924e67dbdfdc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.format; import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.FormatTable; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.BatchTableWrite; @@ -74,15 +75,19 @@ public BatchTableWrite newWrite() { @Override public BatchTableCommit newCommit() { - boolean formatTablePartitionOnlyValueInPath = - (new CoreOptions(table.options())).formatTablePartitionOnlyValueInPath(); + CoreOptions options = new CoreOptions(table.options()); + boolean formatTablePartitionOnlyValueInPath = options.formatTablePartitionOnlyValueInPath(); + String syncHiveUri = options.formatTableCommitSyncPartitionHiveUri(); return new FormatTableCommit( table.location(), table.partitionKeys(), table.fileIO(), formatTablePartitionOnlyValueInPath, overwrite, - staticPartition); + Identifier.fromString(table.fullName()), + staticPartition, + syncHiveUri, + table.catalogContext()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java index 5ff58d8b9a83..80182f82468d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableCommit.java @@ -18,22 +18,31 @@ package org.apache.paimon.table.format; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.TwoPhaseOutputStream; import org.apache.paimon.metrics.MetricRegistry; +import org.apache.paimon.options.CatalogOptions; +import org.apache.paimon.options.Options; import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.TableCommit; +import org.apache.paimon.utils.PartitionPathUtils; import javax.annotation.Nullable; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -50,6 +59,8 @@ public class FormatTableCommit implements BatchTableCommit { private List partitionKeys; protected Map staticPartitions; protected boolean overwrite = false; + private Catalog hiveCatalog; + private Identifier tableIdentifier; public FormatTableCommit( String location, @@ -57,7 +68,10 @@ public FormatTableCommit( FileIO fileIO, boolean formatTablePartitionOnlyValueInPath, boolean overwrite, - @Nullable Map staticPartitions) { + Identifier tableIdentifier, + @Nullable Map staticPartitions, + @Nullable String syncHiveUri, + CatalogContext catalogContext) { this.location = location; this.fileIO = fileIO; this.formatTablePartitionOnlyValueInPath = formatTablePartitionOnlyValueInPath; @@ -65,6 +79,22 @@ public FormatTableCommit( this.staticPartitions = staticPartitions; this.overwrite = overwrite; this.partitionKeys = partitionKeys; + this.tableIdentifier = tableIdentifier; + if (syncHiveUri != null) { + try { + Options options = new Options(); + options.set(CatalogOptions.URI, syncHiveUri); + options.set(CatalogOptions.METASTORE, "hive"); + CatalogContext context = + CatalogContext.create(options, catalogContext.hadoopConf()); + this.hiveCatalog = CatalogFactory.createCatalog(context); + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Failed to initialize Hive catalog with URI: %s", syncHiveUri), + e); + } + } } @Override @@ -81,6 +111,8 @@ public void commit(List commitMessages) { } } + Set> partitionSpecs = new HashSet<>(); + if (staticPartitions != null && !staticPartitions.isEmpty()) { Path partitionPath = buildPartitionPath( @@ -88,7 +120,7 @@ public void commit(List commitMessages) { staticPartitions, formatTablePartitionOnlyValueInPath, partitionKeys); - + partitionSpecs.add(staticPartitions); if (overwrite) { deletePreviousDataFile(partitionPath); } @@ -107,10 +139,21 @@ public void commit(List commitMessages) { for (TwoPhaseOutputStream.Committer committer : committers) { committer.commit(this.fileIO); + if (partitionKeys != null && !partitionKeys.isEmpty() && hiveCatalog != null) { + partitionSpecs.add( + extractPartitionSpecFromPath( + committer.targetPath().getParent(), partitionKeys)); + } } for (TwoPhaseOutputStream.Committer committer : committers) { committer.clean(this.fileIO); } + for (Map partitionSpec : partitionSpecs) { + if (hiveCatalog != null) { + hiveCatalog.createPartitions( + tableIdentifier, Collections.singletonList(partitionSpec)); + } + } } catch (Exception e) { this.abort(commitMessages); @@ -118,6 +161,16 @@ public void commit(List commitMessages) { } } + private LinkedHashMap extractPartitionSpecFromPath( + Path partitionPath, List partitionKeys) { + if (formatTablePartitionOnlyValueInPath) { + return PartitionPathUtils.extractPartitionSpecFromPathOnlyValue( + partitionPath, partitionKeys); + } else { + return PartitionPathUtils.extractPartitionSpecFromPath(partitionPath); + } + } + private static Path buildPartitionPath( String location, Map partitionSpec, diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java index bc99c35a4a9b..f225d5baf79e 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java @@ -516,13 +516,7 @@ public List listPartitions(Identifier ide String tagToPartitionField = table.coreOptions().tagToPartitionField(); if (tagToPartitionField != null) { try { - List partitions = - clients.run( - client -> - client.listPartitions( - identifier.getDatabaseName(), - identifier.getTableName(), - Short.MAX_VALUE)); + List partitions = listPartitionsFromHms(identifier); return partitions.stream() .map( part -> { @@ -558,6 +552,17 @@ public List listPartitions(Identifier ide return listPartitionsFromFileSystem(table); } + @VisibleForTesting + public List listPartitionsFromHms(Identifier identifier) + throws TException, InterruptedException { + return clients.run( + client -> + client.listPartitions( + identifier.getDatabaseName(), + identifier.getTableName(), + Short.MAX_VALUE)); + } + private List> removePartitionsExistsInOtherBranches( Identifier identifier, List> inputs) throws TableNotExistException { FileStoreTable mainTable = diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala index ee423a0a59b6..3845a6b8b9ec 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonHiveTestBase.scala @@ -31,8 +31,6 @@ import java.io.File class PaimonHiveTestBase extends PaimonSparkTestBase { - import PaimonHiveTestBase._ - protected lazy val tempHiveDBDir: File = Utils.createTempDir protected lazy val testHiveMetastore: TestHiveMetastore = new TestHiveMetastore @@ -43,6 +41,8 @@ class PaimonHiveTestBase extends PaimonSparkTestBase { protected val hiveDbName: String = "test_hive" + val hiveUri: String = PaimonHiveTestBase.hiveUri + /** * Add spark_catalog ([[SparkGenericCatalog]] in hive) and paimon_hive ([[SparkCatalog]] in hive) * catalog @@ -61,7 +61,7 @@ class PaimonHiveTestBase extends PaimonSparkTestBase { } override protected def beforeAll(): Unit = { - testHiveMetastore.start(hivePort) + testHiveMetastore.start(PaimonHiveTestBase.hivePort) super.beforeAll() spark.sql(s"USE $sparkCatalogName") spark.sql(s"CREATE DATABASE IF NOT EXISTS $hiveDbName") diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala index 4a895b10b645..573f3c8899d1 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala @@ -18,8 +18,9 @@ package org.apache.paimon.spark.sql -import org.apache.paimon.catalog.Identifier +import org.apache.paimon.catalog.{DelegateCatalog, Identifier} import org.apache.paimon.fs.Path +import org.apache.paimon.hive.HiveCatalog import org.apache.paimon.spark.PaimonHiveTestBase import org.apache.paimon.table.FormatTable import org.apache.paimon.utils.CompressUtils @@ -45,6 +46,29 @@ abstract class FormatTableTestBase extends PaimonHiveTestBase { } } + test("Format table: check partition sync") { + val tableName = "t" + withTable(tableName) { + val hiveCatalog = + paimonCatalog.asInstanceOf[DelegateCatalog].wrapped().asInstanceOf[HiveCatalog] + sql( + s"CREATE TABLE $tableName (f0 INT) USING CSV PARTITIONED BY (`ds` bigint) TBLPROPERTIES ('metastore.partitioned-table'='true')") + sql(s"INSERT INTO $tableName VALUES (1, 2023)") + var ds = 2023L + checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1, ds))) + var partitions = hiveCatalog.listPartitionsFromHms(Identifier.create(hiveDbName, tableName)) + assert(partitions.size == 0) + sql(s"DROP TABLE $tableName") + sql( + s"CREATE TABLE $tableName (f0 INT) USING CSV PARTITIONED BY (`ds` bigint) TBLPROPERTIES ('format-table.commit-hive-sync-url'='$hiveUri', 'metastore.partitioned-table'='true')") + ds = 2024L + sql(s"INSERT INTO $tableName VALUES (1, $ds)") + checkAnswer(sql(s"SELECT * FROM $tableName"), Seq(Row(1, ds))) + partitions = hiveCatalog.listPartitionsFromHms(Identifier.create(hiveDbName, tableName)) + assert(partitions.get(0).getValues.get(0).equals(ds.toString)) + } + } + test("Format table: write partitioned table") { for ( (format, compression) <- Seq(