feat: support LookupMergeTreeCompactRewriter#186
feat: support LookupMergeTreeCompactRewriter#186lszskye wants to merge 10 commits intoalibaba:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds lookup-based merge-tree compaction rewriting (via LookupMergeTreeCompactRewriter) and introduces per-level file-format selection, updating compaction/read/write paths and expanding unit tests accordingly.
Changes:
- Introduces
LookupMergeTreeCompactRewriter+ChangelogMergeTreeRewriterto support lookup-driven rewrite/upgrade flows (including DV-aware paths). - Adds
file.format.per.levelsupport inCoreOptionsand updates call sites to useGetFileFormat()/GetWriteFileFormat(level). - Adds
FileStorePathFactoryCacheand new tests for lookup rewrite behavior and wrapper logic.
Reviewed changes
Copilot reviewed 55 out of 55 changed files in this pull request and generated 14 comments.
Show a summary per file
| File | Description |
|---|---|
| src/paimon/core/utils/file_store_path_factory_test.cpp | Updates tests to use GetFileFormat() for path factory creation. |
| src/paimon/core/utils/file_store_path_factory_cache_test.cpp | Adds unit test for the new path-factory cache. |
| src/paimon/core/utils/file_store_path_factory_cache.h | Introduces a cache to reuse FileStorePathFactory by format identifier. |
| src/paimon/core/table/source/table_scan.cpp | Switches path factory creation to use GetFileFormat(). |
| src/paimon/core/table/source/table_read.cpp | Switches path factory creation to use GetFileFormat(). |
| src/paimon/core/postpone/postpone_bucket_writer.cpp | Updates write-format selection to GetWriteFileFormat(level). |
| src/paimon/core/postpone/postpone_bucket_file_store_write.h | Updates write-format selection to GetWriteFileFormat(level). |
| src/paimon/core/options/lookup_strategy.h | Adds LookupStrategy struct encapsulating lookup decision inputs. |
| src/paimon/core/operation/raw_file_split_read_test.cpp | Switches path factory creation to use GetFileFormat(). |
| src/paimon/core/operation/orphan_files_cleaner.cpp | Switches path factory creation to use GetFileFormat(). |
| src/paimon/core/operation/merge_file_split_read_test.cpp | Switches path factory creation to use GetFileFormat(). |
| src/paimon/core/operation/merge_file_split_read.h | Adds API to inject a merge-function wrapper and refactors wrapper retrieval. |
| src/paimon/core/operation/merge_file_split_read.cpp | Implements SetMergeFunctionWrapper. |
| src/paimon/core/operation/manifest_file_merger_test.cpp | Switches path factory creation to use GetFileFormat(). |
| src/paimon/core/operation/key_value_file_store_scan_test.cpp | Switches path factory creation to use GetFileFormat(). |
| src/paimon/core/operation/file_store_write.cpp | Uses GetWriteFileFormat(level) for write paths. |
| src/paimon/core/operation/file_store_commit.cpp | Uses GetFileFormat() and updates assertions accordingly. |
| src/paimon/core/operation/expire_snapshots_test.cpp | Switches path factory creation to use GetFileFormat(). |
| src/paimon/core/operation/append_only_file_store_write.cpp | Uses GetFileFormat() for writer creation. |
| src/paimon/core/migrate/file_meta_utils.cpp | Uses GetFileFormat() for migration commit message generation. |
| src/paimon/core/mergetree/merge_tree_writer.cpp | Updates write-format selection to GetWriteFileFormat(level). |
| src/paimon/core/mergetree/lookup_levels_test.cpp | Adds coverage for closing and tmp-dir cleanup behavior. |
| src/paimon/core/mergetree/lookup_levels.h | Adds Close() to clear lookup cache. |
| src/paimon/core/mergetree/compact/reducer_merge_function_wrapper.h | Changes GetResult() to reset wrapper state after producing a result. |
| src/paimon/core/mergetree/compact/merge_tree_compact_rewriter_test.cpp | Updates rewriter creation to use FileStorePathFactoryCache. |
| src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.h | Refactors rewriter to use path-factory cache; adds wrapper factory injection points. |
| src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp | Implements per-level format writing and wrapper injection during merge-read. |
| src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter_test.cpp | Adds comprehensive tests for lookup-based rewrite/upgrade and DV behavior. |
| src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.h | Introduces lookup-based rewriter interface and wrapper factories. |
| src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp | Implements lookup-based rewrite/upgrade decisions and DV updates. |
| src/paimon/core/mergetree/compact/lookup_merge_function_test.cpp | Adds tests for high-level selection and insertion ordering. |
| src/paimon/core/mergetree/compact/lookup_merge_function.h | Enhances merge function to track key, level-0 presence, and pick high-level candidate. |
| src/paimon/core/mergetree/compact/lookup_changelog_merge_function_wrapper_test.cpp | Adds tests for lookup-changelog wrapper behavior including DV. |
| src/paimon/core/mergetree/compact/lookup_changelog_merge_function_wrapper.h | Introduces wrapper that performs lookup/DV marking and merges candidates. |
| src/paimon/core/mergetree/compact/first_row_merge_function_wrapper_test.cpp | Adds tests for first-row lookup wrapper behavior. |
| src/paimon/core/mergetree/compact/first_row_merge_function_wrapper.h | Adds wrapper for first-row lookup behavior. |
| src/paimon/core/mergetree/compact/first_row_merge_function.h | Exposes ContainsHighLevel() for wrapper logic. |
| src/paimon/core/mergetree/compact/compact_rewriter.h | Changes Upgrade() to be non-const. |
| src/paimon/core/mergetree/compact/changelog_merge_tree_rewriter.h | Adds a base rewriter that can rewrite and/or produce changelog per strategy. |
| src/paimon/core/mergetree/compact/changelog_merge_tree_rewriter.cpp | Implements rewrite/upgrade routing based on strategy. |
| src/paimon/core/manifest/manifest_entry_writer_test.cpp | Updates write-format selection to GetWriteFileFormat(level). |
| src/paimon/core/key_value.h | Makes KeyValue copyable and changes value ownership to shared_ptr. |
| src/paimon/core/io/single_file_writer_test.cpp | Updates write-format selection to GetWriteFileFormat(level). |
| src/paimon/core/global_index/global_index_write_task.cpp | Switches path factory creation to use GetFileFormat(). |
| src/paimon/core/global_index/global_index_scan_impl.cpp | Switches path factory creation to use GetFileFormat(). |
| src/paimon/core/core_options_test.cpp | Adds coverage for per-level formats and lookup strategy; adds invalid-format tests. |
| src/paimon/core/core_options.h | Adds APIs: GetFileFormat(), GetWriteFileFormat(level), GetLookupStrategy(). |
| src/paimon/core/core_options.cpp | Implements per-level format parsing and lookup strategy computation. |
| src/paimon/core/append/append_only_writer.cpp | Uses GetFileFormat() for writer creation. |
| src/paimon/common/sst/sst_file_reader.cpp | Adds null checks for cached block reads and returns errors on failure. |
| src/paimon/common/io/cache/cache_manager.cpp | Handles null returns from cache get path. |
| src/paimon/common/defs.cpp | Adds new option key file.format.per.level. |
| src/paimon/common/data/generic_row.h | Changes data holder storage from unique_ptr to shared_ptr. |
| src/paimon/CMakeLists.txt | Registers new rewriter sources and new unit tests. |
| include/paimon/defs.h | Documents new file.format.per.level option. |
Comments suppressed due to low confidence (6)
src/paimon/core/utils/file_store_path_factory_cache_test.cpp:1
- This test accesses
FileStorePathFactoryCache::format_to_path_factory_andFileStorePathFactory::format_identifier_directly. Those members are private in the new cache header (and likely private inFileStorePathFactory), so this will not compile. Prefer asserting the cache behavior via public surface: add aSize()accessor (or similar) onFileStorePathFactoryCacheand validate format via a public getter onFileStorePathFactory(or by observing generated paths/extensions), or declare the test as a friend if you intentionally want to white-box test internals.
src/paimon/core/utils/file_store_path_factory_cache_test.cpp:1 - This test accesses
FileStorePathFactoryCache::format_to_path_factory_andFileStorePathFactory::format_identifier_directly. Those members are private in the new cache header (and likely private inFileStorePathFactory), so this will not compile. Prefer asserting the cache behavior via public surface: add aSize()accessor (or similar) onFileStorePathFactoryCacheand validate format via a public getter onFileStorePathFactory(or by observing generated paths/extensions), or declare the test as a friend if you intentionally want to white-box test internals.
src/paimon/core/utils/file_store_path_factory_cache_test.cpp:1 - This test accesses
FileStorePathFactoryCache::format_to_path_factory_andFileStorePathFactory::format_identifier_directly. Those members are private in the new cache header (and likely private inFileStorePathFactory), so this will not compile. Prefer asserting the cache behavior via public surface: add aSize()accessor (or similar) onFileStorePathFactoryCacheand validate format via a public getter onFileStorePathFactory(or by observing generated paths/extensions), or declare the test as a friend if you intentionally want to white-box test internals.
src/paimon/core/utils/file_store_path_factory_cache_test.cpp:1 - This test accesses
FileStorePathFactoryCache::format_to_path_factory_andFileStorePathFactory::format_identifier_directly. Those members are private in the new cache header (and likely private inFileStorePathFactory), so this will not compile. Prefer asserting the cache behavior via public surface: add aSize()accessor (or similar) onFileStorePathFactoryCacheand validate format via a public getter onFileStorePathFactory(or by observing generated paths/extensions), or declare the test as a friend if you intentionally want to white-box test internals.
src/paimon/core/mergetree/compact/reducer_merge_function_wrapper.h:1 Reset()is only called on the success path. Ifmerge_function_->GetResult()returns an error,PAIMON_ASSIGN_OR_RAISEwill return early and skip resetting state, which can leave the wrapper in a partially-initialized state for subsequent use. Consider using a scope guard to ensureReset()runs regardless of success/failure (or explicitly reset before returning the error).
src/paimon/core/mergetree/lookup_levels.h:1- Correct the typo in the comment: 'TODDO' should be 'TODO'.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp
Show resolved
Hide resolved
src/paimon/core/mergetree/compact/lookup_changelog_merge_function_wrapper_test.cpp
Show resolved
Hide resolved
src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter_test.cpp
Show resolved
Hide resolved
src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter_test.cpp
Show resolved
Hide resolved
520e16c to
f47c354
Compare
baccb40 to
48a2190
Compare
There was a problem hiding this comment.
Pull request overview
Adds lookup-based merge-tree compaction rewriting (incl. changelog/DV-related behavior) and introduces per-level file format support in CoreOptions, enabling rewrite/compaction flows that require historical lookup (Issue #93).
Changes:
- Introduce
LookupMergeTreeCompactRewriter+ChangelogMergeTreeRewriterand new merge-function wrappers for lookup/first-row behaviors. - Add
file.format.per.levelparsing and update call sites to useGetFileFormat()/GetWriteFileFormat(level). - Add
FileStorePathFactoryCacheto reuseFileStorePathFactoryinstances per format identifier, and expand tests accordingly.
Reviewed changes
Copilot reviewed 54 out of 54 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/paimon/core/utils/file_store_path_factory_test.cpp | Update tests to use GetFileFormat() API. |
| src/paimon/core/utils/file_store_path_factory_cache_test.cpp | New unit test for format→path-factory caching behavior. |
| src/paimon/core/utils/file_store_path_factory_cache.h | New cache for FileStorePathFactory keyed by format identifier. |
| src/paimon/core/table/source/table_scan.cpp | Switch path-factory creation to GetFileFormat(). |
| src/paimon/core/table/source/table_read.cpp | Switch path-factory creation to GetFileFormat(). |
| src/paimon/core/postpone/postpone_bucket_writer.cpp | Use GetWriteFileFormat(level=0) for writers. |
| src/paimon/core/postpone/postpone_bucket_file_store_write.h | Use GetWriteFileFormat(level=0) for path-factory identifier. |
| src/paimon/core/options/lookup_strategy.h | New LookupStrategy struct encapsulating lookup needs. |
| src/paimon/core/operation/raw_file_split_read_test.cpp | Switch to GetFileFormat() when creating path factory. |
| src/paimon/core/operation/orphan_files_cleaner.cpp | Switch to GetFileFormat() when creating path factory. |
| src/paimon/core/operation/merge_file_split_read_test.cpp | Switch to GetFileFormat() when creating path factory. |
| src/paimon/core/operation/merge_file_split_read.h | Add ability to inject a MergeFunctionWrapper into split read. |
| src/paimon/core/operation/merge_file_split_read.cpp | Implement injected merge-function wrapper + lazy wrapper creation. |
| src/paimon/core/operation/manifest_file_merger_test.cpp | Switch to GetFileFormat() when creating path factory. |
| src/paimon/core/operation/key_value_file_store_scan_test.cpp | Switch to GetFileFormat() when creating path factory. |
| src/paimon/core/operation/file_store_write.cpp | Use GetWriteFileFormat(level=0) for write path-factory identifier. |
| src/paimon/core/operation/file_store_commit.cpp | Replace GetWriteFileFormat() assertions/usage with GetFileFormat(). |
| src/paimon/core/operation/expire_snapshots_test.cpp | Switch to GetFileFormat() when creating path factory. |
| src/paimon/core/operation/append_only_file_store_write.cpp | Switch writer format accessor to GetFileFormat(). |
| src/paimon/core/migrate/file_meta_utils.cpp | Switch to GetFileFormat() for migration commit message generation. |
| src/paimon/core/mergetree/merge_tree_writer.cpp | Use GetWriteFileFormat(level=0) for data writer construction. |
| src/paimon/core/mergetree/lookup_levels_test.cpp | Extend tests to validate local lookup-file cleanup on close. |
| src/paimon/core/mergetree/lookup_levels.h | Add Close() that clears lookup-file cache (triggering local file deletion). |
| src/paimon/core/mergetree/compact/reducer_merge_function_wrapper.h | Ensure GetResult() resets internal state after producing a result. |
| src/paimon/core/mergetree/compact/merge_tree_compact_rewriter_test.cpp | Update to use FileStorePathFactoryCache + new Create signature. |
| src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.h | Extend rewriter to support path-factory cache, DV factory, wrapper injection. |
| src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp | Implement per-level write format, cache-backed path factories, wrapper injection. |
| src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter_test.cpp | New comprehensive tests for lookup-based rewriter, DV behavior, per-level formats, IO failures. |
| src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.h | New lookup-based compact rewriter interface. |
| src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter.cpp | New lookup-based compact rewriter implementation + template instantiations. |
| src/paimon/core/mergetree/compact/lookup_merge_function_test.cpp | Add coverage for high-level selection and insertion ordering. |
| src/paimon/core/mergetree/compact/lookup_merge_function.h | Enhance lookup merge function with key/level tracking and insertion utilities. |
| src/paimon/core/mergetree/compact/lookup_changelog_merge_function_wrapper_test.cpp | New tests for changelog-by-lookup merge wrapper and DV interaction. |
| src/paimon/core/mergetree/compact/lookup_changelog_merge_function_wrapper.h | New wrapper to support lookup-based changelog/DV behaviors during compaction. |
| src/paimon/core/mergetree/compact/first_row_merge_function_wrapper_test.cpp | New tests for first-row wrapper behavior with/without historical containment. |
| src/paimon/core/mergetree/compact/first_row_merge_function_wrapper.h | New wrapper to support first-row lookup-based changelog decisions. |
| src/paimon/core/mergetree/compact/first_row_merge_function.h | Expose ContainsHighLevel() for wrapper logic. |
| src/paimon/core/mergetree/compact/compact_rewriter.h | Make Upgrade() non-const to match new rewriter implementations. |
| src/paimon/core/mergetree/compact/changelog_merge_tree_rewriter.h | New base rewriter that can optionally produce changelog during rewrite/upgrade. |
| src/paimon/core/mergetree/compact/changelog_merge_tree_rewriter.cpp | Implement shared rewrite/upgrade flow for changelog-producing rewriters. |
| src/paimon/core/manifest/manifest_entry_writer_test.cpp | Update to GetWriteFileFormat(level=0). |
| src/paimon/core/key_value.h | Adjust KeyValue ctor to take key/value by value (not rvalue refs). |
| src/paimon/core/io/single_file_writer_test.cpp | Update to GetWriteFileFormat(level=0). |
| src/paimon/core/global_index/global_index_write_task.cpp | Switch to GetFileFormat() for path-factory identifier. |
| src/paimon/core/global_index/global_index_scan_impl.cpp | Switch to GetFileFormat() for path-factory identifier. |
| src/paimon/core/core_options_test.cpp | Add coverage for per-level formats + lookup strategy struct. |
| src/paimon/core/core_options.h | Add GetFileFormat(), GetWriteFileFormat(level), GetLookupStrategy(). |
| src/paimon/core/core_options.cpp | Parse file.format.per.level, normalize defaults, implement lookup strategy. |
| src/paimon/core/append/append_only_writer.cpp | Switch writer format accessor to GetFileFormat(). |
| src/paimon/common/sst/sst_file_reader.cpp | Add null checks for cached blocks/trailers to avoid invalid reads. |
| src/paimon/common/io/cache/cache_manager.cpp | Handle cache get returning null before dereferencing. |
| src/paimon/common/defs.cpp | Add option key constant for file.format.per.level. |
| src/paimon/CMakeLists.txt | Register new sources/tests for lookup/changelog rewriters and caches. |
| include/paimon/defs.h | Document file.format.per.level option. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| assert(result); | ||
| PAIMON_ASSIGN_OR_RAISE(bool contains, contains_(result.value().key)); |
There was a problem hiding this comment.
FirstRowMergeFunctionWrapper::GetResult() asserts that merge_function_->GetResult() has a value when ContainsHighLevel() is false, but FirstRowMergeFunction can legitimately return std::nullopt (e.g., when all inputs are retract records and ignore_delete is enabled). This can trigger an assertion failure in normal operation. Handle the !result case explicitly (return nullopt after reset) before dereferencing/looking up the key.
| assert(result); | |
| PAIMON_ASSIGN_OR_RAISE(bool contains, contains_(result.value().key)); | |
| if (!result) { | |
| // No low-level result produced (e.g., all retracts with ignore_delete enabled). | |
| Reset(); | |
| return std::optional<KeyValue>(); | |
| } | |
| PAIMON_ASSIGN_OR_RAISE(bool contains, contains_(result->key)); |
| const std::shared_ptr<FileStorePathFactoryCache>& path_factory_cache, | ||
| const CoreOptions& options, const std::shared_ptr<MemoryPool>& pool); | ||
|
|
||
| Status Close() override { |
There was a problem hiding this comment.
Close() unconditionally dereferences lookup_levels_, but Create() accepts std::unique_ptr<LookupLevels<T>>&& without validating it is non-null. If a caller passes a null unique_ptr, this will crash. Add input validation in Create() (or guard in Close()) to return an error when lookup_levels is null.
| Status Close() override { | |
| Status Close() override { | |
| if (!lookup_levels_) { | |
| return Status::Invalid("lookup_levels_ is null"); | |
| } |
| // TODDO(xinyu.lxy): invalid cache | ||
| lookup_file_cache_.clear(); |
There was a problem hiding this comment.
Typo in comment: TODDO should be TODO.
| #pragma once | ||
|
|
||
| #include <memory> | ||
| #include <optional> | ||
| #include <utility> | ||
|
|
||
| #include "paimon/core/deletionvectors/bucketed_dv_maintainer.h" | ||
| #include "paimon/core/key_value.h" | ||
| #include "paimon/core/mergetree/compact/lookup_merge_function.h" | ||
| #include "paimon/core/mergetree/compact/merge_function_wrapper.h" | ||
| #include "paimon/core/mergetree/lookup/file_position.h" | ||
| #include "paimon/core/mergetree/lookup/positioned_key_value.h" | ||
| #include "paimon/core/options/lookup_strategy.h" | ||
| #include "paimon/core/utils/fields_comparator.h" | ||
| #include "paimon/result.h" | ||
| #include "paimon/status.h" | ||
| namespace paimon { |
There was a problem hiding this comment.
This header relies on transitive includes for several standard library types it uses (std::function, std::is_same_v, std::string, and std::sort). Please include the corresponding standard headers (e.g., <functional>, <type_traits>, <string>, <algorithm>) to avoid fragile builds when upstream includes change.
| #include <memory> | ||
| #include <optional> | ||
| #include <utility> | ||
|
|
||
| #include "paimon/core/key_value.h" | ||
| #include "paimon/core/mergetree/compact/first_row_merge_function.h" | ||
| #include "paimon/core/mergetree/compact/merge_function_wrapper.h" | ||
| #include "paimon/result.h" | ||
| #include "paimon/status.h" | ||
|
|
||
| namespace paimon { | ||
| /// Wrapper for `MergeFunction`s to produce changelog by lookup for first row. | ||
| class FirstRowMergeFunctionWrapper : public MergeFunctionWrapper<KeyValue> { | ||
| public: | ||
| FirstRowMergeFunctionWrapper( | ||
| std::unique_ptr<FirstRowMergeFunction>&& merge_function, | ||
| std::function<Result<bool>(const std::shared_ptr<InternalRow>&)> contains) | ||
| : merge_function_(std::move(merge_function)), contains_(std::move(contains)) {} |
There was a problem hiding this comment.
This header uses std::function and assert() but does not include <functional> / <cassert> directly. Please add the needed standard includes to avoid relying on transitive includes.
| #pragma once | ||
|
|
||
| #include "paimon/core/core_options.h" | ||
| #include "paimon/core/schema/table_schema.h" | ||
| #include "paimon/core/utils/file_store_path_factory.h" | ||
| #include "paimon/memory/memory_pool.h" | ||
| #include "paimon/result.h" | ||
|
|
There was a problem hiding this comment.
This header uses std::map, std::vector, and std::string but doesn't include the corresponding standard headers. Add explicit includes (e.g., <map>, <vector>, <string>) to avoid depending on transitive includes from other project headers.
| ASSERT_OK_AND_ASSIGN(auto runs, GenerateSortedRuns({file0, file1})); | ||
|
|
||
| // When output_level is 0, RewriteLookupChangelog should return false | ||
| // This tests the condition at line 59 in changelog_merge_tree_rewriter.cpp |
Purpose
support LookupMergeTreeCompactRewriter for Rewrite process with lookup
Linked issue: #93
Tests
src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter_test.cpp