Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,42 @@ struct PAIMON_EXPORT Options {
/// "commit.max-retries" - Maximum number of retries when commit failed. Default value is 10.
static const char COMMIT_MAX_RETRIES[];

/// "compaction.max-size-amplification-percent" - The size amplification is defined as the
/// amount (in percentage) of additional storage needed to store a single byte of data in the
/// merge tree for changelog mode table. Default value is 200.
static const char COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT[];

/// "compaction.size-ratio" - Percentage flexibility while comparing sorted run size for
/// changelog mode table. If the candidate sorted run(s) size is 1% smaller than the next
/// sorted run's size, then include next sorted run into this candidate set. Default value is 1.
static const char COMPACTION_SIZE_RATIO[];

/// "num-sorted-run.compaction-trigger" - The sorted run number to trigger compaction. Includes
/// level0 files (one file one sorted run) and high-level runs (one level one sorted run).
/// Default value is 5.
static const char NUM_SORTED_RUNS_COMPACTION_TRIGGER[];

/// "num-sorted-run.stop-trigger" - The number of sorted runs that trigger the stopping of
/// writes, the default value is 'num-sorted-run.compaction-trigger' + 3.
static const char NUM_SORTED_RUNS_STOP_TRIGGER[];

/// "num-levels" - Total level number, for example, there are 3 levels, including 0,1,2 levels.
static const char NUM_LEVELS[];

/// "lookup-compact" - Lookup compact mode used for lookup compaction. Default value is
/// LookupCompactMode::RADICAL.
static const char LOOKUP_COMPACT[];

/// "compaction.force-up-level-0" - If set to true, compaction strategy will always include all
/// level 0 files in candidates. Default value is false.
static const char COMPACTION_FORCE_UP_LEVEL_0[];

/// "lookup-compact.max-interval" - The max interval for a gentle mode lookup compaction to be
/// triggered. For every interval, a forced lookup compaction will be performed to flush L0
/// files to higher level. This option is only valid when lookup-compact mode is gentle. No
/// default value.
static const char LOOKUP_COMPACT_MAX_INTERVAL[];

/// "sequence.field" - The field that generates the sequence number for primary key table, the
/// sequence number determines which data is the most recent. Value use "," as delimiter.
static const char SEQUENCE_FIELD[];
Expand Down
4 changes: 4 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ set(PAIMON_CORE_SRCS
core/mergetree/compact/aggregate/field_sum_agg.cpp
core/mergetree/compact/interval_partition.cpp
core/mergetree/compact/loser_tree.cpp
core/mergetree/compact/merge_tree_compact_manager.cpp
core/mergetree/compact/merge_tree_compact_rewriter.cpp
core/mergetree/compact/merge_tree_compact_task.cpp
core/mergetree/compact/partial_update_merge_function.cpp
core/mergetree/compact/sort_merge_reader_with_loser_tree.cpp
core/mergetree/compact/sort_merge_reader_with_min_heap.cpp
Expand Down Expand Up @@ -506,6 +508,7 @@ if(PAIMON_BUILD_TESTS)
core/catalog/identifier_test.cpp
core/compact/compact_deletion_file_test.cpp
core/core_options_test.cpp
core/options/lookup_strategy_test.cpp
core/deletionvectors/apply_deletion_vector_batch_reader_test.cpp
core/deletionvectors/bitmap_deletion_vector_test.cpp
core/deletionvectors/bucketed_dv_maintainer_test.cpp
Expand Down Expand Up @@ -570,6 +573,7 @@ if(PAIMON_BUILD_TESTS)
core/mergetree/compact/universal_compaction_test.cpp
core/mergetree/compact/force_up_level0_compaction_test.cpp
core/mergetree/compact/compact_strategy_test.cpp
core/mergetree/compact/merge_tree_compact_manager_test.cpp
core/mergetree/compact/merge_tree_compact_rewriter_test.cpp
core/mergetree/lookup/persist_processor_test.cpp
core/mergetree/drop_delete_reader_test.cpp
Expand Down
10 changes: 10 additions & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,14 @@ const char Options::LOOKUP_CACHE_BLOOM_FILTER_FPP[] = "lookup.cache.bloom.filter
const char Options::LOOKUP_CACHE_SPILL_COMPRESSION[] = "lookup.cache-spill-compression";
const char Options::SPILL_COMPRESSION_ZSTD_LEVEL[] = "spill-compression.zstd-level";
const char Options::CACHE_PAGE_SIZE[] = "cache-page-size";
const char Options::COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT[] =
"compaction.max-size-amplification-percent";
const char Options::COMPACTION_SIZE_RATIO[] = "compaction.size-ratio";
const char Options::NUM_SORTED_RUNS_COMPACTION_TRIGGER[] = "num-sorted-run.compaction-trigger";
const char Options::NUM_SORTED_RUNS_STOP_TRIGGER[] = "num-sorted-run.stop-trigger";
const char Options::NUM_LEVELS[] = "num-levels";
const char Options::COMPACTION_FORCE_UP_LEVEL_0[] = "compaction.force-up-level-0";
const char Options::LOOKUP_COMPACT[] = "lookup-compact";
const char Options::LOOKUP_COMPACT_MAX_INTERVAL[] = "lookup-compact.max-interval";

} // namespace paimon
2 changes: 1 addition & 1 deletion src/paimon/core/append/append_only_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ Status AppendOnlyWriter::Flush(bool wait_for_latest_compaction, bool forced_full
}
// add new generated files
for (const auto& flushed_file : flushed_files) {
compact_manager_->AddNewFile(flushed_file);
PAIMON_RETURN_NOT_OK(compact_manager_->AddNewFile(flushed_file));
}
PAIMON_RETURN_NOT_OK(TrySyncLatestCompaction(wait_for_latest_compaction));
PAIMON_RETURN_NOT_OK(compact_manager_->TriggerCompaction(forced_full_compaction));
Expand Down
3 changes: 2 additions & 1 deletion src/paimon/core/append/bucketed_append_compact_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ class BucketedAppendCompactManager : public CompactFutureManager {
return false;
}

void AddNewFile(const std::shared_ptr<DataFileMeta>& file) override {
Status AddNewFile(const std::shared_ptr<DataFileMeta>& file) override {
to_compact_.push(file);
return Status::OK();
}

std::vector<std::shared_ptr<DataFileMeta>> AllFiles() const override;
Expand Down
46 changes: 44 additions & 2 deletions src/paimon/core/compact/compact_deletion_file.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@ class CompactDeletionFile {
static Result<std::shared_ptr<CompactDeletionFile>> GenerateFiles(
const std::shared_ptr<BucketedDvMaintainer>& maintainer);

virtual std::optional<std::shared_ptr<IndexFileMeta>> GetOrCompute() = 0;
/// For sync compaction, only create deletion files when prepare commit.
static Result<std::shared_ptr<CompactDeletionFile>> LazyGeneration(
const std::shared_ptr<BucketedDvMaintainer>& maintainer);

virtual Result<std::optional<std::shared_ptr<IndexFileMeta>>> GetOrCompute() = 0;

virtual Result<std::shared_ptr<CompactDeletionFile>> MergeOldFile(
const std::shared_ptr<CompactDeletionFile>& old) = 0;
Expand All @@ -53,7 +57,7 @@ class GeneratedDeletionFile : public CompactDeletionFile,
const std::shared_ptr<DeletionVectorsIndexFile>& dv_index_file)
: deletion_file_(deletion_file), dv_index_file_(dv_index_file) {}

std::optional<std::shared_ptr<IndexFileMeta>> GetOrCompute() override {
Result<std::optional<std::shared_ptr<IndexFileMeta>>> GetOrCompute() override {
get_invoked_ = true;
return deletion_file_ ? std::optional<std::shared_ptr<IndexFileMeta>>(deletion_file_)
: std::nullopt;
Expand Down Expand Up @@ -87,6 +91,39 @@ class GeneratedDeletionFile : public CompactDeletionFile,
bool get_invoked_ = false;
};

/// A lazy generation implementation of `CompactDeletionFile`.
class LazyCompactDeletionFile : public CompactDeletionFile,
public std::enable_shared_from_this<LazyCompactDeletionFile> {
public:
explicit LazyCompactDeletionFile(const std::shared_ptr<BucketedDvMaintainer>& maintainer)
: maintainer_(maintainer) {}

Result<std::optional<std::shared_ptr<IndexFileMeta>>> GetOrCompute() override {
generated_ = true;
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<CompactDeletionFile> generated,
CompactDeletionFile::GenerateFiles(maintainer_));
return generated->GetOrCompute();
}

Result<std::shared_ptr<CompactDeletionFile>> MergeOldFile(
const std::shared_ptr<CompactDeletionFile>& old) override {
auto derived = dynamic_cast<LazyCompactDeletionFile*>(old.get());
if (derived == nullptr) {
return Status::Invalid("old should be a LazyCompactDeletionFile, but it is not");
}
if (derived->generated_) {
return Status::Invalid("old should not be generated, this is a bug.");
}
return shared_from_this();
}

void Clean() override {}

private:
std::shared_ptr<BucketedDvMaintainer> maintainer_;
bool generated_ = false;
};

inline Result<std::shared_ptr<CompactDeletionFile>> CompactDeletionFile::GenerateFiles(
const std::shared_ptr<BucketedDvMaintainer>& maintainer) {
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<IndexFileMeta>> file,
Expand All @@ -95,4 +132,9 @@ inline Result<std::shared_ptr<CompactDeletionFile>> CompactDeletionFile::Generat
maintainer->DvIndexFile());
}

inline Result<std::shared_ptr<CompactDeletionFile>> CompactDeletionFile::LazyGeneration(
const std::shared_ptr<BucketedDvMaintainer>& maintainer) {
return std::make_shared<LazyCompactDeletionFile>(maintainer);
}

} // namespace paimon
81 changes: 76 additions & 5 deletions src/paimon/core/compact/compact_deletion_file_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class TestGeneratedDeletionFile : public GeneratedDeletionFile {

class NonGeneratedCompactDeletionFile : public CompactDeletionFile {
public:
std::optional<std::shared_ptr<IndexFileMeta>> GetOrCompute() override {
return std::nullopt;
Result<std::optional<std::shared_ptr<IndexFileMeta>>> GetOrCompute() override {
return std::optional<std::shared_ptr<IndexFileMeta>>();
}

Result<std::shared_ptr<CompactDeletionFile>> MergeOldFile(
Expand Down Expand Up @@ -90,7 +90,7 @@ TEST(CompactDeletionFileTest, GenerateFilesShouldReturnFileWhenModified) {

ASSERT_OK_AND_ASSIGN(std::shared_ptr<CompactDeletionFile> generated,
CompactDeletionFile::GenerateFiles(maintainer));
auto file = generated->GetOrCompute();
ASSERT_OK_AND_ASSIGN(auto file, generated->GetOrCompute());
ASSERT_TRUE(file.has_value());
ASSERT_NE(file.value(), nullptr);
ASSERT_EQ(file.value()->IndexType(), DeletionVectorsIndexFile::DELETION_VECTORS_INDEX);
Expand All @@ -108,7 +108,7 @@ TEST(CompactDeletionFileTest, GenerateFilesShouldReturnNulloptWhenNotModified) {

ASSERT_OK_AND_ASSIGN(std::shared_ptr<CompactDeletionFile> generated,
CompactDeletionFile::GenerateFiles(maintainer));
auto file = generated->GetOrCompute();
ASSERT_OK_AND_ASSIGN(auto file, generated->GetOrCompute());
ASSERT_FALSE(file.has_value());
}

Expand Down Expand Up @@ -149,7 +149,8 @@ TEST(CompactDeletionFileTest, MergeOldFileShouldRejectInvokedOldFile) {

auto current = std::make_shared<GeneratedDeletionFile>(current_meta, dv_index_file);
auto old = std::make_shared<GeneratedDeletionFile>(old_meta, dv_index_file);
ASSERT_TRUE(old->GetOrCompute().has_value());
ASSERT_OK_AND_ASSIGN(auto old_file, old->GetOrCompute());
ASSERT_TRUE(old_file.has_value());

ASSERT_NOK_WITH_MSG(current->MergeOldFile(old), "old should not be get");
}
Expand Down Expand Up @@ -227,4 +228,74 @@ TEST(CompactDeletionFileTest, CleanShouldDeleteIndexFile) {
ASSERT_FALSE(exists_after);
}

TEST(CompactDeletionFileTest, LazyGenerationShouldComputeWhenInvoked) {
auto dir = UniqueTestDirectory::Create();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
FileSystemFactory::Get("local", dir->Str(), {}));
auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
auto pool = GetDefaultPool();

RoaringBitmap32 roaring;
roaring.Add(1);
std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors;
deletion_vectors["data-a"] = std::make_shared<BitmapDeletionVector>(roaring);

auto maintainer = CreateMaintainer(fs, path_factory, pool, deletion_vectors);
maintainer->RemoveDeletionVectorOf("data-a");

ASSERT_OK_AND_ASSIGN(std::shared_ptr<CompactDeletionFile> lazy,
CompactDeletionFile::LazyGeneration(maintainer));
ASSERT_OK_AND_ASSIGN(auto file, lazy->GetOrCompute());
ASSERT_TRUE(file.has_value());
ASSERT_NE(file.value(), nullptr);
ASSERT_EQ(file.value()->IndexType(), DeletionVectorsIndexFile::DELETION_VECTORS_INDEX);
}

TEST(CompactDeletionFileTest, LazyMergeOldFileShouldRejectNonLazyType) {
auto dir = UniqueTestDirectory::Create();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
FileSystemFactory::Get("local", dir->Str(), {}));
auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
auto pool = GetDefaultPool();

std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors;
auto maintainer = CreateMaintainer(fs, path_factory, pool, deletion_vectors);

ASSERT_OK_AND_ASSIGN(std::shared_ptr<CompactDeletionFile> lazy,
CompactDeletionFile::LazyGeneration(maintainer));
auto old = std::make_shared<NonGeneratedCompactDeletionFile>();

auto result = lazy->MergeOldFile(old);
ASSERT_FALSE(result.ok());
ASSERT_TRUE(result.status().ToString().find("LazyCompactDeletionFile") != std::string::npos);
}

TEST(CompactDeletionFileTest, LazyMergeOldFileShouldRejectGeneratedOldLazy) {
auto dir = UniqueTestDirectory::Create();
ASSERT_OK_AND_ASSIGN(std::shared_ptr<FileSystem> fs,
FileSystemFactory::Get("local", dir->Str(), {}));
auto path_factory = std::make_shared<MockIndexPathFactory>(dir->Str());
auto pool = GetDefaultPool();

RoaringBitmap32 roaring;
roaring.Add(1);
std::map<std::string, std::shared_ptr<DeletionVector>> deletion_vectors;
deletion_vectors["data-a"] = std::make_shared<BitmapDeletionVector>(roaring);
auto maintainer = CreateMaintainer(fs, path_factory, pool, deletion_vectors);
maintainer->RemoveDeletionVectorOf("data-a");

ASSERT_OK_AND_ASSIGN(std::shared_ptr<CompactDeletionFile> current,
CompactDeletionFile::LazyGeneration(maintainer));
ASSERT_OK_AND_ASSIGN(std::shared_ptr<CompactDeletionFile> old,
CompactDeletionFile::LazyGeneration(maintainer));

ASSERT_OK_AND_ASSIGN(auto old_file, old->GetOrCompute());
ASSERT_TRUE(old_file.has_value());

auto result = current->MergeOldFile(old);
ASSERT_FALSE(result.ok());
ASSERT_TRUE(result.status().ToString().find("old should not be generated") !=
std::string::npos);
}

} // namespace paimon::test
2 changes: 1 addition & 1 deletion src/paimon/core/compact/compact_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class CompactManager {
public:
virtual ~CompactManager() = default;
/// Add a new file.
virtual void AddNewFile(const std::shared_ptr<DataFileMeta>& file) = 0;
virtual Status AddNewFile(const std::shared_ptr<DataFileMeta>& file) = 0;

virtual std::vector<std::shared_ptr<DataFileMeta>> AllFiles() const = 0;

Expand Down
5 changes: 4 additions & 1 deletion src/paimon/core/compact/noop_compact_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ class NoopCompactManager : public CompactManager {
NoopCompactManager() = default;
~NoopCompactManager() override = default;

void AddNewFile(const std::shared_ptr<DataFileMeta>& file) override {}
Status AddNewFile(const std::shared_ptr<DataFileMeta>& file) override {
(void)file;
return Status::OK();
}

std::vector<std::shared_ptr<DataFileMeta>> AllFiles() const override {
static std::vector<std::shared_ptr<DataFileMeta>> empty;
Expand Down
Loading
Loading