Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,8 @@ private void open() throws Exception {
partitionLoader.checkRefresh();
List<BinaryRow> partitions = partitionLoader.partitions();
if (!partitions.isEmpty()) {
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
lookupTable.specifyPartitions(
partitions, partitionLoader.createSpecificPartFilter());
}
}

Expand Down Expand Up @@ -327,7 +328,8 @@ void tryRefresh() throws Exception {

if (partitionChanged) {
// reopen with latest partition
lookupTable.specificPartitionFilter(partitionLoader.createSpecificPartFilter());
lookupTable.specifyPartitions(
partitionLoader.partitions(), partitionLoader.createSpecificPartFilter());
lookupTable.close();
lookupTable.open();
// no need to refresh the lookup table because it is reopened
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ public abstract class FullCacheLookupTable implements LookupTable {
private final FileStoreTable table;
private Future<?> refreshFuture;
private LookupStreamingReader reader;
private Predicate specificPartition;
@Nullable private List<BinaryRow> scanPartitions;
@Nullable private Predicate partitionFilter;
@Nullable private Filter<InternalRow> cacheRowFilter;

public FullCacheLookupTable(Context context) {
Expand Down Expand Up @@ -139,8 +140,10 @@ public FullCacheLookupTable(Context context) {
}

@Override
public void specificPartitionFilter(Predicate filter) {
this.specificPartition = filter;
public void specifyPartitions(
List<BinaryRow> scanPartitions, @Nullable Predicate partitionFilter) {
this.scanPartitions = scanPartitions;
this.partitionFilter = partitionFilter;
}

@Override
Expand Down Expand Up @@ -172,14 +175,15 @@ private StateFactory createStateFactory() throws IOException {

protected void bootstrap() throws Exception {
Predicate scanPredicate =
PredicateBuilder.andNullable(context.tablePredicate, specificPartition);
PredicateBuilder.andNullable(context.tablePredicate, partitionFilter);
this.reader =
new LookupStreamingReader(
context.table,
context.projection,
scanPredicate,
context.requiredCachedBucketIds,
cacheRowFilter);
cacheRowFilter,
scanPartitions);
if (!stateFactory.preferBulkLoad()) {
doRefresh();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,27 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataTableStreamScan;
import org.apache.paimon.table.source.TableQueryAuth;
import org.apache.paimon.table.source.snapshot.AllDeltaFollowUpScanner;
import org.apache.paimon.table.source.snapshot.BoundedChecker;
import org.apache.paimon.table.source.snapshot.FollowUpScanner;
import org.apache.paimon.table.source.snapshot.FullStartingScanner;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SimpleFileReader;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.util.List;

import static org.apache.paimon.CoreOptions.StartupMode;
import static org.apache.paimon.flink.lookup.LookupFileStoreTable.LookupStreamScanMode;

Expand All @@ -49,29 +52,28 @@ public class LookupDataTableScan extends DataTableStreamScan {

private static final Logger LOG = LoggerFactory.getLogger(LookupDataTableScan.class);

private final FileStoreTable table;
private final StartupMode startupMode;
private final LookupStreamScanMode lookupScanMode;

@Nullable private List<BinaryRow> scanPartitions = null;

public LookupDataTableScan(
TableSchema schema,
CoreOptions options,
FileStoreTable table,
SnapshotReader snapshotReader,
SnapshotManager snapshotManager,
ChangelogManager changelogManager,
boolean supportStreamingReadOverwrite,
LookupStreamScanMode lookupScanMode,
TableQueryAuth queryAuth,
boolean hasPk) {
LookupStreamScanMode lookupScanMode) {
super(
schema,
options,
table.schema(),
table.coreOptions(),
snapshotReader,
snapshotManager,
changelogManager,
supportStreamingReadOverwrite,
queryAuth,
hasPk);
table.snapshotManager(),
table.changelogManager(),
table.supportStreamingReadOverwrite(),
table.catalogEnvironment().tableQueryAuth(table.coreOptions()),
!table.schema().primaryKeys().isEmpty());
CoreOptions options = table.coreOptions();

this.table = table;
this.startupMode = options.startupMode();
this.lookupScanMode = lookupScanMode;
dropStats();
Expand All @@ -81,15 +83,45 @@ public LookupDataTableScan(
}
}

public void setScanPartitions(List<BinaryRow> scanPartitions) {
this.scanPartitions = scanPartitions;
}

@Override
@Nullable
protected SnapshotReader.Plan handleOverwriteSnapshot(Snapshot snapshot) {
SnapshotReader.Plan plan = super.handleOverwriteSnapshot(snapshot);
if (plan != null) {
return plan;
}
LOG.info("Dim table found OVERWRITE snapshot {}, reopen.", snapshot.id());
throw new ReopenException();

if (shouldReopen(snapshot)) {
LOG.info("Dim table found OVERWRITE snapshot {}, reopen.", snapshot.id());
throw new ReopenException();
} else {
return null;
}
}

private boolean shouldReopen(Snapshot snapshot) {
if (scanPartitions == null) {
return true;
}

List<ManifestFileMeta> manifests =
table.manifestListReader().read(snapshot.deltaManifestList());
SimpleFileReader<ManifestEntry> manifestReader = table.manifestFileReader();
for (ManifestFileMeta manifest : manifests) {
List<ManifestEntry> entries = manifestReader.read(manifest.fileName());
for (ManifestEntry entry : entries) {
BinaryRow partition = entry.partition();
if (scanPartitions.contains(partition)) {
return true;
}
}
}

return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,7 @@ public InnerTableRead newRead() {

@Override
public StreamDataTableScan newStreamScan() {
return new LookupDataTableScan(
wrapped.schema(),
wrapped.coreOptions(),
wrapped.newSnapshotReader(),
wrapped.snapshotManager(),
wrapped.changelogManager(),
wrapped.supportStreamingReadOverwrite(),
lookupScanMode,
wrapped.catalogEnvironment().tableQueryAuth(wrapped.coreOptions()),
!wrapped.schema().primaryKeys().isEmpty());
return new LookupDataTableScan(wrapped, wrapped.newSnapshotReader(), lookupScanMode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.lookup;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.SplitsParallelReadUtil;
import org.apache.paimon.mergetree.compact.ConcatRecordReader;
Expand All @@ -29,7 +30,6 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.FunctionWithIOException;
Expand Down Expand Up @@ -61,14 +61,15 @@ public class LookupStreamingReader {
@Nullable private final Filter<InternalRow> cacheRowFilter;
private final ReadBuilder readBuilder;
@Nullable private final Predicate projectedPredicate;
private final StreamTableScan scan;
private final LookupDataTableScan scan;

public LookupStreamingReader(
LookupFileStoreTable table,
int[] projection,
@Nullable Predicate predicate,
Set<Integer> requireCachedBucketIds,
@Nullable Filter<InternalRow> cacheRowFilter) {
@Nullable Filter<InternalRow> cacheRowFilter,
@Nullable List<BinaryRow> scanPartitions) {
this.table = table;
this.projection = projection;
this.cacheRowFilter = cacheRowFilter;
Expand All @@ -81,7 +82,8 @@ public LookupStreamingReader(
requireCachedBucketIds == null
? null
: requireCachedBucketIds::contains);
scan = readBuilder.newStreamScan();
scan = (LookupDataTableScan) readBuilder.newStreamScan();
scan.setScanPartitions(scanPartitions);

if (predicate != null) {
List<String> fieldNames = table.rowType().getFieldNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@

package org.apache.paimon.flink.lookup;

import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.utils.Filter;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;

/** A lookup table which provides get and refresh. */
public interface LookupTable extends Closeable {

void specificPartitionFilter(Predicate filter);
void specifyPartitions(List<BinaryRow> scanPartitions, @Nullable Predicate partitionFilter);

void open() throws Exception;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class PrimaryKeyPartialLookupTable implements LookupTable {
@Nullable private final ProjectedRow keyRearrange;
@Nullable private final ProjectedRow trimmedKeyRearrange;

private Predicate specificPartition;
@Nullable private Predicate partitionFilter;
@Nullable private Filter<InternalRow> cacheRowFilter;
private QueryExecutor queryExecutor;

Expand Down Expand Up @@ -143,13 +143,14 @@ QueryExecutor queryExecutor() {
}

@Override
public void specificPartitionFilter(Predicate filter) {
this.specificPartition = filter;
public void specifyPartitions(
List<BinaryRow> scanPartitions, @Nullable Predicate partitionFilter) {
this.partitionFilter = partitionFilter;
}

@Override
public void open() throws Exception {
this.queryExecutor = executorFactory.create(specificPartition, cacheRowFilter);
this.queryExecutor = executorFactory.create(partitionFilter, cacheRowFilter);
refresh();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1163,16 +1163,18 @@ public void testLookupSpecifiedPartition(LookupCacheMode mode) throws Exception
iterator.close();
}

@Test
public void testMaxPtAndOverwrite() throws Exception {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testMaxPtAndOverwrite(boolean refreshPartitionQuickly) throws Exception {
sql(
"CREATE TABLE PARTITIONED_DIM (pt INT, k INT, v INT) "
+ "PARTITIONED BY (`pt`) WITH ("
+ "'bucket' = '2', "
+ "'bucket-key' = 'k', "
+ "'lookup.dynamic-partition' = 'max_pt()', "
+ "'lookup.dynamic-partition.refresh-interval' = '99999 s', "
+ "'continuous.discovery-interval'='1 ms')");
+ "'lookup.dynamic-partition.refresh-interval' = '%s', "
+ "'continuous.discovery-interval'='1 ms')",
refreshPartitionQuickly ? "10 ms" : "99999 s");
sql(
"INSERT INTO PARTITIONED_DIM VALUES (1, 1, 101), (1, 2, 102), (2, 1, 201), (2, 2, 202)");

Expand All @@ -1192,6 +1194,28 @@ public void testMaxPtAndOverwrite() throws Exception {
result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212), Row.of(3, 213));

// overwrite old partition
sql(
"INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 1) VALUES (1, 111), (2, 112), (3, 113)");
sql("INSERT INTO T VALUES (1), (2), (3)");
result = iterator.collect(3);
assertThat(result)
.containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212), Row.of(3, 213));

// overwrite new MAX partition
sql(
"INSERT OVERWRITE PARTITIONED_DIM PARTITION (pt = 3) VALUES (1, 301), (2, 302), (3, 303)");
sql("INSERT INTO T VALUES (1), (2), (3)");
result = iterator.collect(3);
if (refreshPartitionQuickly) {
assertThat(result)
.containsExactlyInAnyOrder(Row.of(1, 301), Row.of(2, 302), Row.of(3, 303));
} else {
// MAX_PT isn't changed
assertThat(result)
.containsExactlyInAnyOrder(Row.of(1, 211), Row.of(2, 212), Row.of(3, 213));
}
}

@Test
Expand Down
Loading