Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,12 @@
<td>Boolean</td>
<td>Whether to force the use of lookup for compaction.</td>
</tr>
<tr>
<td><h5>format-table.commit-hive-sync-url</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Format table commit hive sync uri.</td>
</tr>
<tr>
<td><h5>format-table.file.compression</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
9 changes: 9 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -2030,6 +2030,11 @@ public InlineElement getDescription() {
.noDefaultValue()
.withFallbackKeys(FILE_COMPRESSION.key())
.withDescription("Format table file compression.");
public static final ConfigOption<String> FORMAT_TABLE_COMMIT_HIVE_SYNC_URI =
ConfigOptions.key("format-table.commit-hive-sync-url")
.stringType()
.noDefaultValue()
.withDescription("Format table commit hive sync uri.");

public static final ConfigOption<String> BLOB_FIELD =
key("blob-field")
Expand Down Expand Up @@ -2326,6 +2331,10 @@ public String formatTableFileCompression() {
}
}

public String formatTableCommitSyncPartitionHiveUri() {
return options.get(FORMAT_TABLE_COMMIT_HIVE_SYNC_URI);
}

public MemorySize fileReaderAsyncThreshold() {
return options.get(FILE_READER_ASYNC_THRESHOLD);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ public static Table loadTable(
Function<Path, FileIO> dataFileIO = metadata.isExternal() ? externalFileIO : internalFileIO;

if (options.type() == TableType.FORMAT_TABLE) {
return toFormatTable(identifier, schema, dataFileIO);
return toFormatTable(identifier, schema, dataFileIO, catalogContext);
}

if (options.type() == TableType.OBJECT_TABLE) {
Expand Down Expand Up @@ -379,7 +379,10 @@ private static Table createSystemTable(Identifier identifier, Table originTable)
}

private static FormatTable toFormatTable(
Identifier identifier, TableSchema schema, Function<Path, FileIO> fileIO) {
Identifier identifier,
TableSchema schema,
Function<Path, FileIO> fileIO,
CatalogContext catalogContext) {
Map<String, String> options = schema.options();
FormatTable.Format format =
FormatTable.parseFormat(
Expand All @@ -396,6 +399,7 @@ private static FormatTable toFormatTable(
.format(format)
.options(options)
.comment(schema.comment())
.catalogContext(catalogContext)
.build();
}

Expand Down
32 changes: 29 additions & 3 deletions paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.IndexManifestEntry;
Expand Down Expand Up @@ -68,6 +69,8 @@ public interface FormatTable extends Table {
@Override
FormatTable copy(Map<String, String> dynamicOptions);

CatalogContext catalogContext();

/** Currently supported formats. */
enum Format {
ORC,
Expand Down Expand Up @@ -105,6 +108,7 @@ class Builder {
private Format format;
private Map<String, String> options;
@Nullable private String comment;
private CatalogContext catalogContext;

public Builder fileIO(FileIO fileIO) {
this.fileIO = fileIO;
Expand Down Expand Up @@ -146,9 +150,22 @@ public Builder comment(@Nullable String comment) {
return this;
}

public Builder catalogContext(CatalogContext catalogContext) {
this.catalogContext = catalogContext;
return this;
}

public FormatTable build() {
return new FormatTableImpl(
fileIO, identifier, rowType, partitionKeys, location, format, options, comment);
fileIO,
identifier,
rowType,
partitionKeys,
location,
format,
options,
comment,
catalogContext);
}
}

Expand All @@ -165,6 +182,7 @@ class FormatTableImpl implements FormatTable {
private final Format format;
private final Map<String, String> options;
@Nullable private final String comment;
private CatalogContext catalogContext;

public FormatTableImpl(
FileIO fileIO,
Expand All @@ -174,7 +192,8 @@ public FormatTableImpl(
String location,
Format format,
Map<String, String> options,
@Nullable String comment) {
@Nullable String comment,
CatalogContext catalogContext) {
this.fileIO = fileIO;
this.identifier = identifier;
this.rowType = rowType;
Expand All @@ -183,6 +202,7 @@ public FormatTableImpl(
this.format = format;
this.options = options;
this.comment = comment;
this.catalogContext = catalogContext;
}

@Override
Expand Down Expand Up @@ -247,7 +267,13 @@ public FormatTable copy(Map<String, String> dynamicOptions) {
location,
format,
newOptions,
comment);
comment,
catalogContext);
}

@Override
public CatalogContext catalogContext() {
return this.catalogContext;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.table.format;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
Expand Down Expand Up @@ -74,15 +75,19 @@ public BatchTableWrite newWrite() {

@Override
public BatchTableCommit newCommit() {
boolean formatTablePartitionOnlyValueInPath =
(new CoreOptions(table.options())).formatTablePartitionOnlyValueInPath();
CoreOptions options = new CoreOptions(table.options());
boolean formatTablePartitionOnlyValueInPath = options.formatTablePartitionOnlyValueInPath();
String syncHiveUri = options.formatTableCommitSyncPartitionHiveUri();
return new FormatTableCommit(
table.location(),
table.partitionKeys(),
table.fileIO(),
formatTablePartitionOnlyValueInPath,
overwrite,
staticPartition);
Identifier.fromString(table.fullName()),
staticPartition,
syncHiveUri,
table.catalogContext());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,31 @@

package org.apache.paimon.table.format;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.TwoPhaseOutputStream;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.TableCommit;
import org.apache.paimon.utils.PartitionPathUtils;

import javax.annotation.Nullable;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -50,21 +59,42 @@ public class FormatTableCommit implements BatchTableCommit {
private List<String> partitionKeys;
protected Map<String, String> staticPartitions;
protected boolean overwrite = false;
private Catalog hiveCatalog;
private Identifier tableIdentifier;

public FormatTableCommit(
String location,
List<String> partitionKeys,
FileIO fileIO,
boolean formatTablePartitionOnlyValueInPath,
boolean overwrite,
@Nullable Map<String, String> staticPartitions) {
Identifier tableIdentifier,
@Nullable Map<String, String> staticPartitions,
@Nullable String syncHiveUri,
CatalogContext catalogContext) {
this.location = location;
this.fileIO = fileIO;
this.formatTablePartitionOnlyValueInPath = formatTablePartitionOnlyValueInPath;
validateStaticPartition(staticPartitions, partitionKeys);
this.staticPartitions = staticPartitions;
this.overwrite = overwrite;
this.partitionKeys = partitionKeys;
this.tableIdentifier = tableIdentifier;
if (syncHiveUri != null) {
try {
Options options = new Options();
options.set(CatalogOptions.URI, syncHiveUri);
options.set(CatalogOptions.METASTORE, "hive");
CatalogContext context =
CatalogContext.create(options, catalogContext.hadoopConf());
this.hiveCatalog = CatalogFactory.createCatalog(context);
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to initialize Hive catalog with URI: %s", syncHiveUri),
e);
}
}
}

@Override
Expand All @@ -81,14 +111,16 @@ public void commit(List<CommitMessage> commitMessages) {
}
}

Set<Map<String, String>> partitionSpecs = new HashSet<>();

if (staticPartitions != null && !staticPartitions.isEmpty()) {
Path partitionPath =
buildPartitionPath(
location,
staticPartitions,
formatTablePartitionOnlyValueInPath,
partitionKeys);

partitionSpecs.add(staticPartitions);
if (overwrite) {
deletePreviousDataFile(partitionPath);
}
Expand All @@ -107,17 +139,38 @@ public void commit(List<CommitMessage> commitMessages) {

for (TwoPhaseOutputStream.Committer committer : committers) {
committer.commit(this.fileIO);
if (partitionKeys != null && !partitionKeys.isEmpty() && hiveCatalog != null) {
partitionSpecs.add(
extractPartitionSpecFromPath(
committer.targetPath().getParent(), partitionKeys));
}
}
for (TwoPhaseOutputStream.Committer committer : committers) {
committer.clean(this.fileIO);
}
for (Map<String, String> partitionSpec : partitionSpecs) {
if (hiveCatalog != null) {
hiveCatalog.createPartitions(
tableIdentifier, Collections.singletonList(partitionSpec));
}
}

} catch (Exception e) {
this.abort(commitMessages);
throw new RuntimeException(e);
}
}

private LinkedHashMap<String, String> extractPartitionSpecFromPath(
Path partitionPath, List<String> partitionKeys) {
if (formatTablePartitionOnlyValueInPath) {
return PartitionPathUtils.extractPartitionSpecFromPathOnlyValue(
partitionPath, partitionKeys);
} else {
return PartitionPathUtils.extractPartitionSpecFromPath(partitionPath);
}
}

private static Path buildPartitionPath(
String location,
Map<String, String> partitionSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,13 +516,7 @@ public List<org.apache.paimon.partition.Partition> listPartitions(Identifier ide
String tagToPartitionField = table.coreOptions().tagToPartitionField();
if (tagToPartitionField != null) {
try {
List<Partition> partitions =
clients.run(
client ->
client.listPartitions(
identifier.getDatabaseName(),
identifier.getTableName(),
Short.MAX_VALUE));
List<Partition> partitions = listPartitionsFromHms(identifier);
return partitions.stream()
.map(
part -> {
Expand Down Expand Up @@ -558,6 +552,17 @@ public List<org.apache.paimon.partition.Partition> listPartitions(Identifier ide
return listPartitionsFromFileSystem(table);
}

@VisibleForTesting
public List<Partition> listPartitionsFromHms(Identifier identifier)
throws TException, InterruptedException {
return clients.run(
client ->
client.listPartitions(
identifier.getDatabaseName(),
identifier.getTableName(),
Short.MAX_VALUE));
}

private List<Map<String, String>> removePartitionsExistsInOtherBranches(
Identifier identifier, List<Map<String, String>> inputs) throws TableNotExistException {
FileStoreTable mainTable =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import java.io.File

class PaimonHiveTestBase extends PaimonSparkTestBase {

import PaimonHiveTestBase._

protected lazy val tempHiveDBDir: File = Utils.createTempDir

protected lazy val testHiveMetastore: TestHiveMetastore = new TestHiveMetastore
Expand All @@ -43,6 +41,8 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {

protected val hiveDbName: String = "test_hive"

val hiveUri: String = PaimonHiveTestBase.hiveUri

/**
* Add spark_catalog ([[SparkGenericCatalog]] in hive) and paimon_hive ([[SparkCatalog]] in hive)
* catalog
Expand All @@ -61,7 +61,7 @@ class PaimonHiveTestBase extends PaimonSparkTestBase {
}

override protected def beforeAll(): Unit = {
testHiveMetastore.start(hivePort)
testHiveMetastore.start(PaimonHiveTestBase.hivePort)
super.beforeAll()
spark.sql(s"USE $sparkCatalogName")
spark.sql(s"CREATE DATABASE IF NOT EXISTS $hiveDbName")
Expand Down
Loading