Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build_support/lint_exclusions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
*vendored/*
*RcppExports.cpp*
*arrowExports.cpp*
*src/paimon/testing/utils/test_helper.h
9 changes: 9 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,15 @@ struct PAIMON_EXPORT Options {
/// Default value is false.
static const char DELETION_VECTORS_ENABLED[];

/// "deletion-vector.index-file.target-size" - The target size of deletion vector index file.
/// Default value is 2MB.
static const char DELETION_VECTOR_INDEX_FILE_TARGET_SIZE[];

/// "deletion-vectors.bitmap64" - Enable 64 bit bitmap implementation. Note that only 64 bit
/// bitmap implementation is compatible with Iceberg. Default value is "false".
/// @note: bitmap64 dv is not supported.
static const char DELETION_VECTOR_BITMAP64[];

/// @note `CHANGELOG_PRODUCER` currently only support `none`
///
/// "changelog-producer" - Whether to double write to a changelog file. This changelog file
Expand Down
11 changes: 11 additions & 0 deletions include/paimon/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ enum class StatusCode : char {
IOError = 5,
CapacityError = 6,
IndexError = 7,
Cancelled = 8,
UnknownError = 9,
NotImplemented = 10,
SerializationError = 11,
Expand Down Expand Up @@ -176,6 +177,12 @@ class PAIMON_MUST_USE_TYPE PAIMON_EXPORT Status : public util::EqualityComparabl
return Status::FromArgs(StatusCode::IndexError, std::forward<Args>(args)...);
}

/// Return an error status for cancelled operation
template <typename... Args>
static Status Cancelled(Args&&... args) {
return Status::FromArgs(StatusCode::Cancelled, std::forward<Args>(args)...);
}

/// Return an error status when a container's capacity would exceed its limits
template <typename... Args>
static Status CapacityError(Args&&... args) {
Expand Down Expand Up @@ -223,6 +230,10 @@ class PAIMON_MUST_USE_TYPE PAIMON_EXPORT Status : public util::EqualityComparabl
bool IsInvalid() const {
return code() == StatusCode::Invalid;
}
/// Return true iff the status indicates a cancelled operation.
bool IsCancelled() const {
return code() == StatusCode::Cancelled;
}
/// Return true iff the status indicates an IO-related failure.
bool IsIOError() const {
return code() == StatusCode::IOError;
Expand Down
5 changes: 5 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -504,11 +504,14 @@ if(PAIMON_BUILD_TESTS)
core/catalog/file_system_catalog_test.cpp
core/catalog/catalog_test.cpp
core/catalog/identifier_test.cpp
core/compact/compact_deletion_file_test.cpp
core/core_options_test.cpp
core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp
core/deletionvectors/bitmap_deletion_vector_test.cpp
core/deletionvectors/bucketed_dv_maintainer_test.cpp
core/deletionvectors/deletion_file_writer_test.cpp
core/deletionvectors/deletion_vector_test.cpp
core/deletionvectors/deletion_vector_index_file_writer_test.cpp
core/deletionvectors/deletion_vectors_index_file_test.cpp
core/index/index_in_data_file_dir_path_factory_test.cpp
core/index/deletion_vector_meta_test.cpp
Expand Down Expand Up @@ -540,6 +543,7 @@ if(PAIMON_BUILD_TESTS)
core/manifest/partition_entry_test.cpp
core/manifest/file_entry_test.cpp
core/manifest/index_manifest_entry_serializer_test.cpp
core/manifest/index_manifest_file_handler_test.cpp
core/mergetree/levels_test.cpp
core/mergetree/lookup_file_test.cpp
core/mergetree/lookup_levels_test.cpp
Expand Down Expand Up @@ -586,6 +590,7 @@ if(PAIMON_BUILD_TESTS)
core/operation/append_only_file_store_scan_test.cpp
core/operation/key_value_file_store_scan_test.cpp
core/operation/file_store_scan_test.cpp
core/operation/file_system_write_restore_test.cpp
core/operation/file_store_write_test.cpp
core/operation/manifest_file_merger_test.cpp
core/operation/merge_file_split_read_test.cpp
Expand Down
14 changes: 14 additions & 0 deletions src/paimon/common/data/binary_row.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <sstream>
#include <string>
#include <string_view>
#include <tuple>
#include <utility>
#include <variant>
#include <vector>
Expand Down Expand Up @@ -163,6 +164,19 @@ struct hash<std::pair<paimon::BinaryRow, int32_t>> {
}
};

/// for std::unordered_map<std::tuple<paimon::BinaryRow, int32_t, std::string>, ...>
template <>
struct hash<std::tuple<paimon::BinaryRow, int32_t, std::string>> {
size_t operator()(
const std::tuple<paimon::BinaryRow, int32_t, std::string>& partition_bucket_type) const {
const auto& [partition, bucket, index_type] = partition_bucket_type;
size_t hash = paimon::MurmurHashUtils::HashUnsafeBytes(
reinterpret_cast<const void*>(&bucket), 0, sizeof(bucket), partition.HashCode());
return paimon::MurmurHashUtils::HashUnsafeBytes(index_type.data(), 0, index_type.size(),
hash);
}
};

template <>
struct hash<paimon::BinaryRow> {
size_t operator()(const paimon::BinaryRow& row) const {
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ const char Options::SORT_ENGINE[] = "sort-engine";
const char Options::IGNORE_DELETE[] = "ignore-delete";
const char Options::FIELDS_DEFAULT_AGG_FUNC[] = "fields.default-aggregate-function";
const char Options::DELETION_VECTORS_ENABLED[] = "deletion-vectors.enabled";
const char Options::DELETION_VECTOR_INDEX_FILE_TARGET_SIZE[] =
"deletion-vector.index-file.target-size";
const char Options::DELETION_VECTOR_BITMAP64[] = "deletion-vectors.bitmap64";
const char Options::CHANGELOG_PRODUCER[] = "changelog-producer";
const char Options::FORCE_LOOKUP[] = "force-lookup";
const char Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE[] =
Expand Down
5 changes: 4 additions & 1 deletion src/paimon/common/utils/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ std::string Status::CodeAsString(StatusCode code) {
case StatusCode::IndexError:
type = "Index error";
break;
case StatusCode::Cancelled:
type = "Cancelled";
break;
case StatusCode::UnknownError:
type = "Unknown error";
break;
Expand Down Expand Up @@ -129,7 +132,7 @@ void Status::Abort() const {
}

void Status::Abort(const std::string& message) const {
std::cerr << "-- Arrow Fatal Error --\n";
std::cerr << "-- Paimon Fatal Error --\n";
if (!message.empty()) {
std::cerr << message << "\n";
}
Expand Down
11 changes: 10 additions & 1 deletion src/paimon/core/append/bucketed_append_compact_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ BucketedAppendCompactManager::BucketedAppendCompactManager(
const std::vector<std::shared_ptr<DataFileMeta>>& restored,
const std::shared_ptr<BucketedDvMaintainer>& dv_maintainer, int32_t min_file_num,
int64_t target_file_size, int64_t compaction_file_size, bool force_rewrite_all_files,
CompactRewriter rewriter, const std::shared_ptr<CompactionMetrics::Reporter>& reporter)
CompactRewriter rewriter, const std::shared_ptr<CompactionMetrics::Reporter>& reporter,
const std::shared_ptr<std::atomic_bool>& cancel_flag)
: executor_(executor),
dv_maintainer_(dv_maintainer),
min_file_num_(min_file_num),
Expand All @@ -38,12 +39,18 @@ BucketedAppendCompactManager::BucketedAppendCompactManager(
[](const std::shared_ptr<DataFileMeta>& lhs, const std::shared_ptr<DataFileMeta>& rhs) {
return lhs->min_sequence_number > rhs->min_sequence_number;
}),
cancel_flag_(cancel_flag ? cancel_flag : std::make_shared<std::atomic_bool>(false)),
logger_(Logger::GetLogger("BucketedAppendCompactManager")) {
for (const auto& file : restored) {
to_compact_.push(file);
}
}

void BucketedAppendCompactManager::CancelCompaction() {
cancel_flag_->store(true, std::memory_order_relaxed);
CompactFutureManager::CancelCompaction();
}

Status BucketedAppendCompactManager::TriggerCompaction(bool full_compaction) {
if (full_compaction) {
PAIMON_RETURN_NOT_OK(TriggerFullCompaction());
Expand Down Expand Up @@ -71,6 +78,7 @@ Status BucketedAppendCompactManager::TriggerFullCompaction() {
compacting.push_back(to_compact_.top());
to_compact_.pop();
}
cancel_flag_->store(false, std::memory_order_relaxed);
auto compact_task = std::make_shared<FullCompactTask>(reporter_, dv_maintainer_, compacting,
compaction_file_size_,
force_rewrite_all_files_, rewriter_);
Expand All @@ -87,6 +95,7 @@ void BucketedAppendCompactManager::TriggerCompactionWithBestEffort() {
}
std::optional<std::vector<std::shared_ptr<DataFileMeta>>> picked = PickCompactBefore();
if (picked) {
cancel_flag_->store(false, std::memory_order_relaxed);
compacting_ = picked.value();
auto compact_task = std::make_shared<AutoCompactTask>(reporter_, dv_maintainer_,
compacting_.value(), rewriter_);
Expand Down
7 changes: 6 additions & 1 deletion src/paimon/core/append/bucketed_append_compact_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <atomic>
#include <deque>
#include <functional>
#include <memory>
Expand Down Expand Up @@ -72,9 +73,12 @@ class BucketedAppendCompactManager : public CompactFutureManager {
int32_t min_file_num, int64_t target_file_size,
int64_t compaction_file_size, bool force_rewrite_all_files,
CompactRewriter rewriter,
const std::shared_ptr<CompactionMetrics::Reporter>& reporter);
const std::shared_ptr<CompactionMetrics::Reporter>& reporter,
const std::shared_ptr<std::atomic_bool>& cancel_flag);
~BucketedAppendCompactManager() override = default;

void CancelCompaction() override;

Status TriggerCompaction(bool full_compaction) override;

bool ShouldWaitForLatestCompaction() const override {
Expand Down Expand Up @@ -195,6 +199,7 @@ class BucketedAppendCompactManager : public CompactFutureManager {
std::shared_ptr<CompactionMetrics::Reporter> reporter_;
std::optional<std::vector<std::shared_ptr<DataFileMeta>>> compacting_;
DataFileMetaPriorityQueue to_compact_;
std::shared_ptr<std::atomic_bool> cancel_flag_;
std::unique_ptr<Logger> logger_;
};

Expand Down
56 changes: 55 additions & 1 deletion src/paimon/core/append/bucketed_append_compact_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

#include "paimon/core/append/bucketed_append_compact_manager.h"

#include <atomic>
#include <chrono>
#include <future>
#include <optional>
#include <string>
#include <thread>
#include <utility>
#include <vector>

Expand All @@ -27,6 +31,7 @@
#include "paimon/core/stats/simple_stats.h"
#include "paimon/executor.h"
#include "paimon/result.h"
#include "paimon/testing/utils/testharness.h"

namespace paimon::test {

Expand Down Expand Up @@ -70,7 +75,8 @@ class BucketedAppendCompactManagerTest : public testing::Test {
BucketedAppendCompactManager manager(
executor_, to_compact_before_pick,
/*dv_maintainer=*/nullptr, min_file_num, target_file_size, threshold,
/*force_rewrite_all_files=*/false, /*rewriter=*/nullptr, /*reporter=*/nullptr);
/*force_rewrite_all_files=*/false, /*rewriter=*/nullptr, /*reporter=*/nullptr,
/*cancel_flag=*/std::make_shared<std::atomic_bool>(false));
auto actual = manager.PickCompactBefore();
if (expected_present) {
ASSERT_TRUE(actual.has_value());
Expand Down Expand Up @@ -260,4 +266,52 @@ TEST_F(BucketedAppendCompactManagerTest, TestPick) {
NewFile(2601, 2610), NewFile(2611, 2620), NewFile(2621, 2630)});
}

TEST_F(BucketedAppendCompactManagerTest, TestCancelCompactionPropagatesToRewriteLoop) {
auto cancel_flag = std::make_shared<std::atomic_bool>(false);
auto exit_signal = std::make_shared<std::promise<void>>();
auto exit_future = exit_signal->get_future();

auto rewriter = [cancel_flag,
exit_signal](const std::vector<std::shared_ptr<DataFileMeta>>& to_compact)
-> Result<std::vector<std::shared_ptr<DataFileMeta>>> {
while (!cancel_flag->load(std::memory_order_relaxed)) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
exit_signal->set_value();
return Status::Invalid("compaction cancelled in rewrite loop");
};

BucketedAppendCompactManager manager(
executor_, {NewFile(1, 100), NewFile(101, 200), NewFile(201, 300), NewFile(301, 400)},
/*dv_maintainer=*/nullptr,
/*min_file_num=*/4,
/*target_file_size=*/1024,
/*compaction_file_size=*/700,
/*force_rewrite_all_files=*/false, rewriter,
/*reporter=*/nullptr, cancel_flag);

ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true));
manager.CancelCompaction();

EXPECT_EQ(exit_future.wait_for(std::chrono::seconds(1)), std::future_status::ready);
}

TEST_F(BucketedAppendCompactManagerTest, TestTriggerCompactionResetsCancelFlag) {
auto cancel_flag = std::make_shared<std::atomic_bool>(true);
auto rewriter = [](const std::vector<std::shared_ptr<DataFileMeta>>& to_compact)
-> Result<std::vector<std::shared_ptr<DataFileMeta>>> { return to_compact; };

BucketedAppendCompactManager manager(
executor_, {NewFile(1, 100), NewFile(101, 200), NewFile(201, 300), NewFile(301, 400)},
/*dv_maintainer=*/nullptr,
/*min_file_num=*/4,
/*target_file_size=*/1024,
/*compaction_file_size=*/700,
/*force_rewrite_all_files=*/false, rewriter,
/*reporter=*/nullptr, cancel_flag);

ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true));
EXPECT_FALSE(cancel_flag->load(std::memory_order_relaxed));
}

} // namespace paimon::test
5 changes: 5 additions & 0 deletions src/paimon/core/compact/compact_deletion_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,14 @@

namespace paimon {

/// Deletion File from compaction.
class CompactDeletionFile {
public:
virtual ~CompactDeletionFile() = default;

/// Used by async compaction, when compaction task is completed, deletions file will be
/// generated immediately, so when updateCompactResult, we need to merge old deletion files
/// (just delete them).
static Result<std::shared_ptr<CompactDeletionFile>> GenerateFiles(
const std::shared_ptr<BucketedDvMaintainer>& maintainer);

Expand All @@ -41,6 +45,7 @@ class CompactDeletionFile {
virtual void Clean() = 0;
};

/// A generated files implementation of `CompactDeletionFile`.
class GeneratedDeletionFile : public CompactDeletionFile,
public std::enable_shared_from_this<GeneratedDeletionFile> {
public:
Expand Down
Loading
Loading