Skip to content

Commit 65c72f3

Browse files
authored
feat(compaction): support append table compaction with dv (#177)
1 parent 28b6b09 commit 65c72f3

70 files changed

Lines changed: 2118 additions & 416 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

build_support/lint_exclusions.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@
1010
*vendored/*
1111
*RcppExports.cpp*
1212
*arrowExports.cpp*
13+
*src/paimon/testing/utils/test_helper.h

include/paimon/defs.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,15 @@ struct PAIMON_EXPORT Options {
232232
/// Default value is false.
233233
static const char DELETION_VECTORS_ENABLED[];
234234

235+
/// "deletion-vector.index-file.target-size" - The target size of deletion vector index file.
236+
/// Default value is 2MB.
237+
static const char DELETION_VECTOR_INDEX_FILE_TARGET_SIZE[];
238+
239+
/// "deletion-vectors.bitmap64" - Enable 64 bit bitmap implementation. Note that only 64 bit
240+
/// bitmap implementation is compatible with Iceberg. Default value is "false".
241+
/// @note: bitmap64 dv is not supported.
242+
static const char DELETION_VECTOR_BITMAP64[];
243+
235244
/// @note `CHANGELOG_PRODUCER` currently only support `none`
236245
///
237246
/// "changelog-producer" - Whether to double write to a changelog file. This changelog file

include/paimon/status.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ enum class StatusCode : char {
5353
IOError = 5,
5454
CapacityError = 6,
5555
IndexError = 7,
56+
Cancelled = 8,
5657
UnknownError = 9,
5758
NotImplemented = 10,
5859
SerializationError = 11,
@@ -176,6 +177,12 @@ class PAIMON_MUST_USE_TYPE PAIMON_EXPORT Status : public util::EqualityComparabl
176177
return Status::FromArgs(StatusCode::IndexError, std::forward<Args>(args)...);
177178
}
178179

180+
/// Return an error status for cancelled operation
181+
template <typename... Args>
182+
static Status Cancelled(Args&&... args) {
183+
return Status::FromArgs(StatusCode::Cancelled, std::forward<Args>(args)...);
184+
}
185+
179186
/// Return an error status when a container's capacity would exceed its limits
180187
template <typename... Args>
181188
static Status CapacityError(Args&&... args) {
@@ -223,6 +230,10 @@ class PAIMON_MUST_USE_TYPE PAIMON_EXPORT Status : public util::EqualityComparabl
223230
bool IsInvalid() const {
224231
return code() == StatusCode::Invalid;
225232
}
233+
/// Return true iff the status indicates a cancelled operation.
234+
bool IsCancelled() const {
235+
return code() == StatusCode::Cancelled;
236+
}
226237
/// Return true iff the status indicates an IO-related failure.
227238
bool IsIOError() const {
228239
return code() == StatusCode::IOError;

src/paimon/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,11 +504,14 @@ if(PAIMON_BUILD_TESTS)
504504
core/catalog/file_system_catalog_test.cpp
505505
core/catalog/catalog_test.cpp
506506
core/catalog/identifier_test.cpp
507+
core/compact/compact_deletion_file_test.cpp
507508
core/core_options_test.cpp
508509
core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp
509510
core/deletionvectors/bitmap_deletion_vector_test.cpp
510511
core/deletionvectors/bucketed_dv_maintainer_test.cpp
512+
core/deletionvectors/deletion_file_writer_test.cpp
511513
core/deletionvectors/deletion_vector_test.cpp
514+
core/deletionvectors/deletion_vector_index_file_writer_test.cpp
512515
core/deletionvectors/deletion_vectors_index_file_test.cpp
513516
core/index/index_in_data_file_dir_path_factory_test.cpp
514517
core/index/deletion_vector_meta_test.cpp
@@ -540,6 +543,7 @@ if(PAIMON_BUILD_TESTS)
540543
core/manifest/partition_entry_test.cpp
541544
core/manifest/file_entry_test.cpp
542545
core/manifest/index_manifest_entry_serializer_test.cpp
546+
core/manifest/index_manifest_file_handler_test.cpp
543547
core/mergetree/levels_test.cpp
544548
core/mergetree/lookup_file_test.cpp
545549
core/mergetree/lookup_levels_test.cpp
@@ -586,6 +590,7 @@ if(PAIMON_BUILD_TESTS)
586590
core/operation/append_only_file_store_scan_test.cpp
587591
core/operation/key_value_file_store_scan_test.cpp
588592
core/operation/file_store_scan_test.cpp
593+
core/operation/file_system_write_restore_test.cpp
589594
core/operation/file_store_write_test.cpp
590595
core/operation/manifest_file_merger_test.cpp
591596
core/operation/merge_file_split_read_test.cpp

src/paimon/common/data/binary_row.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <sstream>
2424
#include <string>
2525
#include <string_view>
26+
#include <tuple>
2627
#include <utility>
2728
#include <variant>
2829
#include <vector>
@@ -163,6 +164,19 @@ struct hash<std::pair<paimon::BinaryRow, int32_t>> {
163164
}
164165
};
165166

167+
/// for std::unordered_map<std::tuple<paimon::BinaryRow, int32_t, std::string>, ...>
168+
template <>
169+
struct hash<std::tuple<paimon::BinaryRow, int32_t, std::string>> {
170+
size_t operator()(
171+
const std::tuple<paimon::BinaryRow, int32_t, std::string>& partition_bucket_type) const {
172+
const auto& [partition, bucket, index_type] = partition_bucket_type;
173+
size_t hash = paimon::MurmurHashUtils::HashUnsafeBytes(
174+
reinterpret_cast<const void*>(&bucket), 0, sizeof(bucket), partition.HashCode());
175+
return paimon::MurmurHashUtils::HashUnsafeBytes(index_type.data(), 0, index_type.size(),
176+
hash);
177+
}
178+
};
179+
166180
template <>
167181
struct hash<paimon::BinaryRow> {
168182
size_t operator()(const paimon::BinaryRow& row) const {

src/paimon/common/defs.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ const char Options::SORT_ENGINE[] = "sort-engine";
6363
const char Options::IGNORE_DELETE[] = "ignore-delete";
6464
const char Options::FIELDS_DEFAULT_AGG_FUNC[] = "fields.default-aggregate-function";
6565
const char Options::DELETION_VECTORS_ENABLED[] = "deletion-vectors.enabled";
66+
const char Options::DELETION_VECTOR_INDEX_FILE_TARGET_SIZE[] =
67+
"deletion-vector.index-file.target-size";
68+
const char Options::DELETION_VECTOR_BITMAP64[] = "deletion-vectors.bitmap64";
6669
const char Options::CHANGELOG_PRODUCER[] = "changelog-producer";
6770
const char Options::FORCE_LOOKUP[] = "force-lookup";
6871
const char Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE[] =

src/paimon/common/utils/status.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ std::string Status::CodeAsString(StatusCode code) {
9393
case StatusCode::IndexError:
9494
type = "Index error";
9595
break;
96+
case StatusCode::Cancelled:
97+
type = "Cancelled";
98+
break;
9699
case StatusCode::UnknownError:
97100
type = "Unknown error";
98101
break;
@@ -129,7 +132,7 @@ void Status::Abort() const {
129132
}
130133

131134
void Status::Abort(const std::string& message) const {
132-
std::cerr << "-- Arrow Fatal Error --\n";
135+
std::cerr << "-- Paimon Fatal Error --\n";
133136
if (!message.empty()) {
134137
std::cerr << message << "\n";
135138
}

src/paimon/core/append/bucketed_append_compact_manager.cpp

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ BucketedAppendCompactManager::BucketedAppendCompactManager(
2525
const std::vector<std::shared_ptr<DataFileMeta>>& restored,
2626
const std::shared_ptr<BucketedDvMaintainer>& dv_maintainer, int32_t min_file_num,
2727
int64_t target_file_size, int64_t compaction_file_size, bool force_rewrite_all_files,
28-
CompactRewriter rewriter, const std::shared_ptr<CompactionMetrics::Reporter>& reporter)
28+
CompactRewriter rewriter, const std::shared_ptr<CompactionMetrics::Reporter>& reporter,
29+
const std::shared_ptr<std::atomic_bool>& cancel_flag)
2930
: executor_(executor),
3031
dv_maintainer_(dv_maintainer),
3132
min_file_num_(min_file_num),
@@ -38,12 +39,18 @@ BucketedAppendCompactManager::BucketedAppendCompactManager(
3839
[](const std::shared_ptr<DataFileMeta>& lhs, const std::shared_ptr<DataFileMeta>& rhs) {
3940
return lhs->min_sequence_number > rhs->min_sequence_number;
4041
}),
42+
cancel_flag_(cancel_flag ? cancel_flag : std::make_shared<std::atomic_bool>(false)),
4143
logger_(Logger::GetLogger("BucketedAppendCompactManager")) {
4244
for (const auto& file : restored) {
4345
to_compact_.push(file);
4446
}
4547
}
4648

49+
void BucketedAppendCompactManager::CancelCompaction() {
50+
cancel_flag_->store(true, std::memory_order_relaxed);
51+
CompactFutureManager::CancelCompaction();
52+
}
53+
4754
Status BucketedAppendCompactManager::TriggerCompaction(bool full_compaction) {
4855
if (full_compaction) {
4956
PAIMON_RETURN_NOT_OK(TriggerFullCompaction());
@@ -71,6 +78,7 @@ Status BucketedAppendCompactManager::TriggerFullCompaction() {
7178
compacting.push_back(to_compact_.top());
7279
to_compact_.pop();
7380
}
81+
cancel_flag_->store(false, std::memory_order_relaxed);
7482
auto compact_task = std::make_shared<FullCompactTask>(reporter_, dv_maintainer_, compacting,
7583
compaction_file_size_,
7684
force_rewrite_all_files_, rewriter_);
@@ -87,6 +95,7 @@ void BucketedAppendCompactManager::TriggerCompactionWithBestEffort() {
8795
}
8896
std::optional<std::vector<std::shared_ptr<DataFileMeta>>> picked = PickCompactBefore();
8997
if (picked) {
98+
cancel_flag_->store(false, std::memory_order_relaxed);
9099
compacting_ = picked.value();
91100
auto compact_task = std::make_shared<AutoCompactTask>(reporter_, dv_maintainer_,
92101
compacting_.value(), rewriter_);

src/paimon/core/append/bucketed_append_compact_manager.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#pragma once
1818

19+
#include <atomic>
1920
#include <deque>
2021
#include <functional>
2122
#include <memory>
@@ -72,9 +73,12 @@ class BucketedAppendCompactManager : public CompactFutureManager {
7273
int32_t min_file_num, int64_t target_file_size,
7374
int64_t compaction_file_size, bool force_rewrite_all_files,
7475
CompactRewriter rewriter,
75-
const std::shared_ptr<CompactionMetrics::Reporter>& reporter);
76+
const std::shared_ptr<CompactionMetrics::Reporter>& reporter,
77+
const std::shared_ptr<std::atomic_bool>& cancel_flag);
7678
~BucketedAppendCompactManager() override = default;
7779

80+
void CancelCompaction() override;
81+
7882
Status TriggerCompaction(bool full_compaction) override;
7983

8084
bool ShouldWaitForLatestCompaction() const override {
@@ -195,6 +199,7 @@ class BucketedAppendCompactManager : public CompactFutureManager {
195199
std::shared_ptr<CompactionMetrics::Reporter> reporter_;
196200
std::optional<std::vector<std::shared_ptr<DataFileMeta>>> compacting_;
197201
DataFileMetaPriorityQueue to_compact_;
202+
std::shared_ptr<std::atomic_bool> cancel_flag_;
198203
std::unique_ptr<Logger> logger_;
199204
};
200205

src/paimon/core/append/bucketed_append_compact_manager_test.cpp

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,12 @@
1616

1717
#include "paimon/core/append/bucketed_append_compact_manager.h"
1818

19+
#include <atomic>
20+
#include <chrono>
21+
#include <future>
1922
#include <optional>
2023
#include <string>
24+
#include <thread>
2125
#include <utility>
2226
#include <vector>
2327

@@ -27,6 +31,7 @@
2731
#include "paimon/core/stats/simple_stats.h"
2832
#include "paimon/executor.h"
2933
#include "paimon/result.h"
34+
#include "paimon/testing/utils/testharness.h"
3035

3136
namespace paimon::test {
3237

@@ -70,7 +75,8 @@ class BucketedAppendCompactManagerTest : public testing::Test {
7075
BucketedAppendCompactManager manager(
7176
executor_, to_compact_before_pick,
7277
/*dv_maintainer=*/nullptr, min_file_num, target_file_size, threshold,
73-
/*force_rewrite_all_files=*/false, /*rewriter=*/nullptr, /*reporter=*/nullptr);
78+
/*force_rewrite_all_files=*/false, /*rewriter=*/nullptr, /*reporter=*/nullptr,
79+
/*cancel_flag=*/std::make_shared<std::atomic_bool>(false));
7480
auto actual = manager.PickCompactBefore();
7581
if (expected_present) {
7682
ASSERT_TRUE(actual.has_value());
@@ -260,4 +266,52 @@ TEST_F(BucketedAppendCompactManagerTest, TestPick) {
260266
NewFile(2601, 2610), NewFile(2611, 2620), NewFile(2621, 2630)});
261267
}
262268

269+
TEST_F(BucketedAppendCompactManagerTest, TestCancelCompactionPropagatesToRewriteLoop) {
270+
auto cancel_flag = std::make_shared<std::atomic_bool>(false);
271+
auto exit_signal = std::make_shared<std::promise<void>>();
272+
auto exit_future = exit_signal->get_future();
273+
274+
auto rewriter = [cancel_flag,
275+
exit_signal](const std::vector<std::shared_ptr<DataFileMeta>>& to_compact)
276+
-> Result<std::vector<std::shared_ptr<DataFileMeta>>> {
277+
while (!cancel_flag->load(std::memory_order_relaxed)) {
278+
std::this_thread::sleep_for(std::chrono::milliseconds(1));
279+
}
280+
exit_signal->set_value();
281+
return Status::Invalid("compaction cancelled in rewrite loop");
282+
};
283+
284+
BucketedAppendCompactManager manager(
285+
executor_, {NewFile(1, 100), NewFile(101, 200), NewFile(201, 300), NewFile(301, 400)},
286+
/*dv_maintainer=*/nullptr,
287+
/*min_file_num=*/4,
288+
/*target_file_size=*/1024,
289+
/*compaction_file_size=*/700,
290+
/*force_rewrite_all_files=*/false, rewriter,
291+
/*reporter=*/nullptr, cancel_flag);
292+
293+
ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true));
294+
manager.CancelCompaction();
295+
296+
EXPECT_EQ(exit_future.wait_for(std::chrono::seconds(1)), std::future_status::ready);
297+
}
298+
299+
TEST_F(BucketedAppendCompactManagerTest, TestTriggerCompactionResetsCancelFlag) {
300+
auto cancel_flag = std::make_shared<std::atomic_bool>(true);
301+
auto rewriter = [](const std::vector<std::shared_ptr<DataFileMeta>>& to_compact)
302+
-> Result<std::vector<std::shared_ptr<DataFileMeta>>> { return to_compact; };
303+
304+
BucketedAppendCompactManager manager(
305+
executor_, {NewFile(1, 100), NewFile(101, 200), NewFile(201, 300), NewFile(301, 400)},
306+
/*dv_maintainer=*/nullptr,
307+
/*min_file_num=*/4,
308+
/*target_file_size=*/1024,
309+
/*compaction_file_size=*/700,
310+
/*force_rewrite_all_files=*/false, rewriter,
311+
/*reporter=*/nullptr, cancel_flag);
312+
313+
ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true));
314+
EXPECT_FALSE(cancel_flag->load(std::memory_order_relaxed));
315+
}
316+
263317
} // namespace paimon::test

0 commit comments

Comments
 (0)