Skip to content

Commit 0663320

Browse files
committed
fix
1 parent 75906cc commit 0663320

10 files changed

Lines changed: 144 additions & 43 deletions

src/paimon/common/sst/sst_file_reader.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,13 +131,13 @@ Result<std::shared_ptr<BlockReader>> SstFileReader::ReadBlock(
131131
auto trailer_data = block_cache_->GetBlock(handle->Offset() + handle->Size(),
132132
BlockTrailer::ENCODED_LENGTH, true);
133133
if (!trailer_data) {
134-
return Status::Invalid("read block failed");
134+
return Status::Invalid("Read trailer failed");
135135
}
136136
auto trailer_input = MemorySlice::Wrap(trailer_data)->ToInput();
137137
auto trailer = BlockTrailer::ReadBlockTrailer(trailer_input);
138138
auto block_data = block_cache_->GetBlock(handle->Offset(), handle->Size(), index);
139139
if (!block_data) {
140-
return Status::Invalid("read block failed");
140+
return Status::Invalid("Read block failed");
141141
}
142142
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<MemorySegment> uncompressed_data,
143143
DecompressBlock(block_data, trailer, pool_));

src/paimon/core/core_options_test.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ TEST(CoreOptionsTest, TestFromMap) {
246246
ASSERT_TRUE(core_options.NeedLookup());
247247
LookupStrategy expected_lookup_strategy = {/*is_first_row=*/false,
248248
/*produce_changelog=*/false,
249-
/*deletion_vector=*/true, /*need_lookup=*/true};
249+
/*deletion_vector=*/true, /*force_lookup=*/true};
250250
ASSERT_EQ(expected_lookup_strategy, core_options.GetLookupStrategy());
251251

252252
std::map<std::string, std::string> seq_grp;

src/paimon/core/mergetree/compact/changelog_merge_tree_rewriter.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,20 @@ ChangelogMergeTreeRewriter::ChangelogMergeTreeRewriter(
2020
int32_t max_level, bool force_drop_delete, const BinaryRow& partition, int32_t bucket,
2121
int64_t schema_id, const std::vector<std::string>& trimmed_primary_keys,
2222
const CoreOptions& options, const std::shared_ptr<arrow::Schema>& data_schema,
23-
const std::shared_ptr<arrow::Schema>& write_schema,
23+
const std::shared_ptr<arrow::Schema>& write_schema, DeletionVector::Factory dv_factory,
2424
const std::shared_ptr<FileStorePathFactoryCache>& path_factory_cache,
2525
std::unique_ptr<MergeFileSplitRead>&& merge_file_split_read,
2626
MergeFunctionWrapperFactory merge_function_wrapper_factory,
2727
const std::shared_ptr<MemoryPool>& pool)
2828
: MergeTreeCompactRewriter(partition, bucket, schema_id, trimmed_primary_keys, options,
29-
data_schema, write_schema, path_factory_cache,
29+
data_schema, write_schema, std::move(dv_factory), path_factory_cache,
3030
std::move(merge_file_split_read),
3131
std::move(merge_function_wrapper_factory), pool),
3232
max_level_(max_level),
3333
force_drop_delete_(force_drop_delete) {}
3434

3535
Result<CompactResult> ChangelogMergeTreeRewriter::Rewrite(
36-
int32_t output_level, bool drop_delete,
37-
const std::vector<std::vector<SortedRun>>& sections) {
36+
int32_t output_level, bool drop_delete, const std::vector<std::vector<SortedRun>>& sections) {
3837
if (RewriteChangelog(output_level, drop_delete, sections)) {
3938
return RewriteOrProduceChangelog(output_level, sections, drop_delete,
4039
/*rewrite_compact_file=*/true);
@@ -44,7 +43,7 @@ Result<CompactResult> ChangelogMergeTreeRewriter::Rewrite(
4443
}
4544

4645
Result<CompactResult> ChangelogMergeTreeRewriter::Upgrade(
47-
int32_t output_level, const std::shared_ptr<DataFileMeta>& file) {
46+
int32_t output_level, const std::shared_ptr<DataFileMeta>& file) {
4847
UpgradeStrategy upgrade_strategy = GenerateUpgradeStrategy(output_level, file);
4948
if (upgrade_strategy.changelog) {
5049
return RewriteOrProduceChangelog(output_level, {{SortedRun::FromSingle(file)}},

src/paimon/core/mergetree/compact/changelog_merge_tree_rewriter.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class ChangelogMergeTreeRewriter : public MergeTreeCompactRewriter {
3838
const CoreOptions& options,
3939
const std::shared_ptr<arrow::Schema>& data_schema,
4040
const std::shared_ptr<arrow::Schema>& write_schema,
41+
DeletionVector::Factory dv_factory,
4142
const std::shared_ptr<FileStorePathFactoryCache>& path_factory_cache,
4243
std::unique_ptr<MergeFileSplitRead>&& merge_file_split_read,
4344
MergeFunctionWrapperFactory merge_function_wrapper_factory,

src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,16 @@ LookupMergeTreeCompactRewriter<T>::LookupMergeTreeCompactRewriter(
3030
const BinaryRow& partition, int32_t bucket, int64_t schema_id,
3131
const std::vector<std::string>& trimmed_primary_keys, const CoreOptions& options,
3232
const std::shared_ptr<arrow::Schema>& data_schema,
33-
const std::shared_ptr<arrow::Schema>& write_schema,
33+
const std::shared_ptr<arrow::Schema>& write_schema, DeletionVector::Factory dv_factory,
3434
const std::shared_ptr<FileStorePathFactoryCache>& path_factory_cache,
3535
std::unique_ptr<MergeFileSplitRead>&& merge_file_split_read,
3636
MergeFunctionWrapperFactory merge_function_wrapper_factory,
3737
const std::shared_ptr<MemoryPool>& pool)
38-
: ChangelogMergeTreeRewriter(
39-
max_level, /*force_drop_delete=*/dv_maintainer != nullptr, partition, bucket, schema_id,
40-
trimmed_primary_keys, options, data_schema, write_schema, path_factory_cache,
41-
std::move(merge_file_split_read), std::move(merge_function_wrapper_factory), pool),
38+
: ChangelogMergeTreeRewriter(max_level, /*force_drop_delete=*/dv_maintainer != nullptr,
39+
partition, bucket, schema_id, trimmed_primary_keys, options,
40+
data_schema, write_schema, std::move(dv_factory),
41+
path_factory_cache, std::move(merge_file_split_read),
42+
std::move(merge_function_wrapper_factory), pool),
4243
lookup_levels_(std::move(lookup_levels)),
4344
dv_maintainer_(dv_maintainer) {}
4445

@@ -49,6 +50,7 @@ LookupMergeTreeCompactRewriter<T>::Create(
4950
const std::shared_ptr<BucketedDvMaintainer>& dv_maintainer,
5051
MergeFunctionWrapperFactory merge_function_wrapper_factory, int32_t bucket,
5152
const BinaryRow& partition, const std::shared_ptr<TableSchema>& table_schema,
53+
DeletionVector::Factory dv_factory,
5254
const std::shared_ptr<FileStorePathFactoryCache>& path_factory_cache,
5355
const CoreOptions& options, const std::shared_ptr<MemoryPool>& pool) {
5456
PAIMON_ASSIGN_OR_RAISE(std::vector<std::string> trimmed_primary_keys,
@@ -65,16 +67,17 @@ LookupMergeTreeCompactRewriter<T>::Create(
6567
PAIMON_ASSIGN_OR_RAISE(
6668
std::shared_ptr<InternalReadContext> internal_context,
6769
InternalReadContext::Create(read_context, table_schema, options.ToMap()));
68-
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileStorePathFactory> path_factory,
69-
path_factory_cache->GetOrCreatePathFactory(
70-
options.GetFileFormat()->Identifier()));
70+
PAIMON_ASSIGN_OR_RAISE(
71+
std::shared_ptr<FileStorePathFactory> path_factory,
72+
path_factory_cache->GetOrCreatePathFactory(options.GetFileFormat()->Identifier()));
7173
PAIMON_ASSIGN_OR_RAISE(
7274
std::unique_ptr<MergeFileSplitRead> merge_file_split_read,
7375
MergeFileSplitRead::Create(path_factory, internal_context, pool, CreateDefaultExecutor()));
7476
return std::unique_ptr<LookupMergeTreeCompactRewriter>(new LookupMergeTreeCompactRewriter(
7577
std::move(lookup_levels), dv_maintainer, max_level, partition, bucket, table_schema->Id(),
76-
trimmed_primary_keys, options, data_schema, write_schema, path_factory_cache,
77-
std::move(merge_file_split_read), std::move(merge_function_wrapper_factory), pool));
78+
trimmed_primary_keys, options, data_schema, write_schema, std::move(dv_factory),
79+
path_factory_cache, std::move(merge_file_split_read),
80+
std::move(merge_function_wrapper_factory), pool));
7881
}
7982

8083
template <typename T>

src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class LookupMergeTreeCompactRewriter : public ChangelogMergeTreeRewriter {
3434
const std::shared_ptr<BucketedDvMaintainer>& dv_maintainer,
3535
MergeFunctionWrapperFactory merge_function_wrapper_factory, int32_t bucket,
3636
const BinaryRow& partition, const std::shared_ptr<TableSchema>& table_schema,
37+
DeletionVector::Factory dv_factory,
3738
const std::shared_ptr<FileStorePathFactoryCache>& path_factory_cache,
3839
const CoreOptions& options, const std::shared_ptr<MemoryPool>& pool);
3940

@@ -59,7 +60,7 @@ class LookupMergeTreeCompactRewriter : public ChangelogMergeTreeRewriter {
5960
const BinaryRow& partition, int32_t bucket, int64_t schema_id,
6061
const std::vector<std::string>& trimmed_primary_keys, const CoreOptions& options,
6162
const std::shared_ptr<arrow::Schema>& data_schema,
62-
const std::shared_ptr<arrow::Schema>& write_schema,
63+
const std::shared_ptr<arrow::Schema>& write_schema, DeletionVector::Factory dv_factory,
6364
const std::shared_ptr<FileStorePathFactoryCache>& path_factory_cache,
6465
std::unique_ptr<MergeFileSplitRead>&& merge_file_split_read,
6566
MergeFunctionWrapperFactory merge_function_wrapper_factory,

0 commit comments

Comments
 (0)