Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions cloud/src/meta-store/keys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -825,15 +825,6 @@ void data_rowset_ref_count_key(const DataRowsetRefCountKeyInfo& in, std::string*
encode_bytes(std::get<2>(in), out); // rowset_id
}

void meta_rowset_key(const MetaRowsetKeyInfo& in, std::string* out) {
out->push_back(CLOUD_VERSIONED_KEY_SPACE03);
encode_bytes(META_KEY_PREFIX, out); // "meta"
encode_bytes(std::get<0>(in), out); // instance_id
encode_bytes(META_KEY_INFIX_ROWSET, out); // "rowset"
encode_int64(std::get<1>(in), out); // tablet_id
encode_bytes(std::get<2>(in), out); // rowset_id
}

//==============================================================================
// Snapshot keys
//==============================================================================
Expand Down
7 changes: 0 additions & 7 deletions cloud/src/meta-store/keys.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,6 @@ using MetaDeleteBitmapInfo = BasicKeyInfo<__LINE__ , std::tuple<std::string, int
// 0:instance_id 1:tablet_id 2:rowset_id
using DataRowsetRefCountKeyInfo = BasicKeyInfo<__LINE__, std::tuple<std::string, int64_t, std::string>>;

// 0x03 "meta" ${instance_id} "rowset" ${tablet_id} ${rowset_id} -> RowsetMetaPB
// 0:instance_id 1:tablet_id 2:rowset_id
using MetaRowsetKeyInfo = BasicKeyInfo<__LINE__, std::tuple<std::string, int64_t, std::string>>;

// 0x03 "snapshot" ${instance_id} "full" ${timestamp} -> SnapshotPB
// 0:instance_id
using SnapshotFullKeyInfo = BasicKeyInfo<__LINE__, std::tuple<std::string>>;
Expand Down Expand Up @@ -533,9 +529,6 @@ static inline std::string meta_delete_bitmap_key(const MetaDeleteBitmapInfo& in)
void data_rowset_ref_count_key(const DataRowsetRefCountKeyInfo& in, std::string* out);
static inline std::string data_rowset_ref_count_key(const DataRowsetRefCountKeyInfo& in) { std::string s; data_rowset_ref_count_key(in, &s); return s; }

void meta_rowset_key(const MetaRowsetKeyInfo& in, std::string* out);
static inline std::string meta_rowset_key(const MetaRowsetKeyInfo& in) { std::string s; meta_rowset_key(in, &s); return s; }

void snapshot_full_key(const SnapshotFullKeyInfo& in, std::string* out);
static inline std::string snapshot_full_key(const SnapshotFullKeyInfo& in) { std::string s; snapshot_full_key(in, &s); return s; }

Expand Down
124 changes: 42 additions & 82 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1617,41 +1617,6 @@ int64_t calculate_restore_job_expired_time(
return final_expiration;
}

int get_meta_rowset_key(Transaction* txn, const std::string& instance_id, int64_t tablet_id,
const std::string& rowset_id, int64_t start_version, int64_t end_version,
bool load_key, bool* exist) {
std::string key =
load_key ? versioned::meta_rowset_load_key({instance_id, tablet_id, end_version})
: versioned::meta_rowset_compact_key({instance_id, tablet_id, end_version});
RowsetMetaCloudPB rowset_meta;
Versionstamp version;
TxnErrorCode err = versioned::document_get(txn, key, &rowset_meta, &version);
if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
VLOG_DEBUG << "not found load or compact meta_rowset_key."
<< " rowset_id=" << rowset_id << " start_version=" << start_version
<< " end_version=" << end_version << " key=" << hex(key);
} else if (err != TxnErrorCode::TXN_OK) {
LOG_INFO("failed to get load or compact meta_rowset_key.")
.tag("rowset_id", rowset_id)
.tag("start_version", start_version)
.tag("end_version", end_version)
.tag("key", hex(key))
.tag("error_code", err);
return -1;
} else if (rowset_meta.rowset_id_v2() == rowset_id) {
*exist = true;
VLOG_DEBUG << "found load or compact meta_rowset_key."
<< " rowset_id=" << rowset_id << " start_version=" << start_version
<< " end_version=" << end_version << " key=" << hex(key);
} else {
VLOG_DEBUG << "rowset_id does not match when find load or compact meta_rowset_key."
<< " rowset_id=" << rowset_id << " start_version=" << start_version
<< " end_version=" << end_version << " key=" << hex(key)
<< " found_rowset_id=" << rowset_meta.rowset_id_v2();
}
return 0;
}

int InstanceRecycler::abort_txn_for_related_rowset(int64_t txn_id) {
AbortTxnRequest req;
TxnInfoPB txn_info;
Expand Down Expand Up @@ -4412,29 +4377,18 @@ int InstanceRecycler::recycle_versioned_tablet(int64_t tablet_id,
};

std::vector<RowsetDeleteTask> all_tasks;

auto create_delete_task = [this](const RowsetMetaCloudPB& rs_meta, std::string_view recycle_key,
std::string_view non_versioned_rowset_key =
"") -> RowsetDeleteTask {
RowsetDeleteTask task;
task.rowset_meta = rs_meta;
task.recycle_rowset_key = std::string(recycle_key);
task.non_versioned_rowset_key = std::string(non_versioned_rowset_key);
task.versioned_rowset_key = versioned::meta_rowset_key(
{instance_id_, rs_meta.tablet_id(), rs_meta.rowset_id_v2()});
return task;
};

for (const auto& [rs_meta, versionstamp] : load_rowset_metas) {
update_rowset_stats(rs_meta);
// Version 0-1 rowset has no resource_id and no actual data files,
// but still needs ref_count key cleanup, so we add it to all_tasks.
// It will be filtered out in Phase 2 when building rowsets_to_delete.
std::string rowset_load_key =
RowsetDeleteTask task;
task.rowset_meta = rs_meta;
task.versioned_rowset_key =
versioned::meta_rowset_load_key({instance_id_, tablet_id, rs_meta.end_version()});
std::string rowset_key = meta_rowset_key({instance_id_, tablet_id, rs_meta.end_version()});
RowsetDeleteTask task = create_delete_task(
rs_meta, encode_versioned_key(rowset_load_key, versionstamp), rowset_key);
task.non_versioned_rowset_key =
meta_rowset_key({instance_id_, tablet_id, rs_meta.end_version()});
task.versionstamp = versionstamp;
all_tasks.push_back(std::move(task));
}

Expand All @@ -4443,11 +4397,13 @@ int InstanceRecycler::recycle_versioned_tablet(int64_t tablet_id,
// Version 0-1 rowset has no resource_id and no actual data files,
// but still needs ref_count key cleanup, so we add it to all_tasks.
// It will be filtered out in Phase 2 when building rowsets_to_delete.
std::string rowset_compact_key = versioned::meta_rowset_compact_key(
RowsetDeleteTask task;
task.rowset_meta = rs_meta;
task.versioned_rowset_key = versioned::meta_rowset_compact_key(
{instance_id_, tablet_id, rs_meta.end_version()});
std::string rowset_key = meta_rowset_key({instance_id_, tablet_id, rs_meta.end_version()});
RowsetDeleteTask task = create_delete_task(
rs_meta, encode_versioned_key(rowset_compact_key, versionstamp), rowset_key);
task.non_versioned_rowset_key =
meta_rowset_key({instance_id_, tablet_id, rs_meta.end_version()});
task.versionstamp = versionstamp;
all_tasks.push_back(std::move(task));
}

Expand Down Expand Up @@ -4493,7 +4449,9 @@ int InstanceRecycler::recycle_versioned_tablet(int64_t tablet_id,
// Version 0-1 rowset has no resource_id and no actual data files,
// but still needs ref_count key cleanup, so we add it to all_tasks.
// It will be filtered out in Phase 2 when building rowsets_to_delete.
RowsetDeleteTask task = create_delete_task(rowset_meta, k);
RowsetDeleteTask task;
task.rowset_meta = rowset_meta;
task.recycle_rowset_key = k;
all_tasks.push_back(std::move(task));
}
return 0;
Expand All @@ -4517,8 +4475,7 @@ int InstanceRecycler::recycle_versioned_tablet(int64_t tablet_id,
.tag("tablet_id", tablet_id)
.tag("rowset_id", task.rowset_meta.rowset_id_v2());
concurrent_delete_executor.add([this, t = std::move(task)]() mutable {
return recycle_rowset_meta_and_data(t.recycle_rowset_key, t.rowset_meta,
t.non_versioned_rowset_key);
return recycle_rowset_meta_and_data(t);
});
}
}
Expand Down Expand Up @@ -5335,7 +5292,11 @@ int InstanceRecycler::recycle_versioned_rowsets() {
bool is_compacted = rowset.type() == RecycleRowsetPB::COMPACT;
worker_pool->submit(
[&, is_compacted, k = std::string(k), rowset_meta = std::move(*rowset_meta)]() {
if (recycle_rowset_meta_and_data(k, rowset_meta) != 0) {
// The load & compact rowset keys are recycled during recycling operation logs.
RowsetDeleteTask task;
task.rowset_meta = rowset_meta;
task.recycle_rowset_key = k;
if (recycle_rowset_meta_and_data(task) != 0) {
return;
}
num_compacted += is_compacted;
Expand Down Expand Up @@ -5382,10 +5343,9 @@ int InstanceRecycler::recycle_versioned_rowsets() {
return ret;
}

int InstanceRecycler::recycle_rowset_meta_and_data(std::string_view recycle_rowset_key,
const RowsetMetaCloudPB& rowset_meta,
std::string_view non_versioned_rowset_key) {
int InstanceRecycler::recycle_rowset_meta_and_data(const RowsetDeleteTask& task) {
constexpr int MAX_RETRY = 10;
const RowsetMetaCloudPB& rowset_meta = task.rowset_meta;
int64_t tablet_id = rowset_meta.tablet_id();
const std::string& rowset_id = rowset_meta.rowset_id_v2();
std::string_view reference_instance_id = instance_id_;
Expand All @@ -5395,7 +5355,7 @@ int InstanceRecycler::recycle_rowset_meta_and_data(std::string_view recycle_rows

AnnotateTag tablet_id_tag("tablet_id", tablet_id);
AnnotateTag rowset_id_tag("rowset_id", rowset_id);
AnnotateTag rowset_key_tag("recycle_rowset_key", hex(recycle_rowset_key));
AnnotateTag rowset_key_tag("recycle_rowset_key", hex(task.recycle_rowset_key));
AnnotateTag instance_id_tag("instance_id", instance_id_);
AnnotateTag ref_instance_id_tag("ref_instance_id", reference_instance_id);
for (int i = 0; i < MAX_RETRY; ++i) {
Expand Down Expand Up @@ -5460,13 +5420,6 @@ int InstanceRecycler::recycle_rowset_meta_and_data(std::string_view recycle_rows
LOG_INFO("remove versioned delete bitmap kv")
.tag("begin", hex(versioned_dbm_start_key))
.tag("end", hex(versioned_dbm_end_key));

std::string meta_rowset_key_begin =
versioned::meta_rowset_key({reference_instance_id, tablet_id, rowset_id});
std::string meta_rowset_key_end = meta_rowset_key_begin;
encode_int64(INT64_MAX, &meta_rowset_key_end);
txn->remove(meta_rowset_key_begin, meta_rowset_key_end);
LOG_INFO("remove meta rowset key").tag("key", hex(meta_rowset_key_begin));
} else {
// Decrease the rowset ref count.
//
Expand All @@ -5479,13 +5432,22 @@ int InstanceRecycler::recycle_rowset_meta_and_data(std::string_view recycle_rows
.tag("ref_count_key", hex(rowset_ref_count_key));
}

if (!recycle_rowset_key.empty()) { // empty when recycle ref rowsets for deleted instance
txn->remove(recycle_rowset_key);
LOG_INFO("remove recycle rowset key").tag("key", hex(recycle_rowset_key));
if (!task.versioned_rowset_key.empty()) {
versioned::document_remove<RowsetMetaCloudPB>(txn.get(), task.versioned_rowset_key,
task.versionstamp);
LOG_INFO("remove versioned meta rowset key").tag("key", hex(task.versioned_rowset_key));
}
if (!non_versioned_rowset_key.empty()) {
txn->remove(non_versioned_rowset_key);
LOG_INFO("remove non versioned rowset key").tag("key", hex(non_versioned_rowset_key));

if (!task.non_versioned_rowset_key.empty()) {
txn->remove(task.non_versioned_rowset_key);
LOG_INFO("remove non versioned rowset key")
.tag("key", hex(task.non_versioned_rowset_key));
}

// empty when recycle ref rowsets for deleted instance
if (!task.recycle_rowset_key.empty()) {
txn->remove(task.recycle_rowset_key);
LOG_INFO("remove recycle rowset key").tag("key", hex(task.recycle_rowset_key));
}

err = txn->commit();
Expand Down Expand Up @@ -7490,15 +7452,13 @@ int InstanceRecycler::cleanup_rowset_metadata(const std::vector<RowsetDeleteTask

// Remove versioned meta rowset key
if (!task.versioned_rowset_key.empty()) {
std::string versioned_rowset_key_end = task.versioned_rowset_key;
encode_int64(INT64_MAX, &versioned_rowset_key_end);
txn->remove(task.versioned_rowset_key, versioned_rowset_key_end);
versioned::document_remove<RowsetMetaCloudPB>(
txn.get(), task.versioned_rowset_key, task.versionstamp);
LOG_INFO("remove versioned meta rowset key in cleanup phase")
.tag("instance_id", instance_id_)
.tag("tablet_id", tablet_id)
.tag("rowset_id", rowset_id)
.tag("begin", hex(task.versioned_rowset_key))
.tag("end", hex(versioned_rowset_key_end));
.tag("key_prefix", hex(task.versioned_rowset_key));
}

if (!task.non_versioned_rowset_key.empty()) {
Expand Down
7 changes: 2 additions & 5 deletions cloud/src/recycler/recycler.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ struct RowsetDeleteTask {
std::string recycle_rowset_key; // Primary key marking "pending recycle"
std::string non_versioned_rowset_key; // Legacy non-versioned rowset meta key
std::string versioned_rowset_key; // Versioned meta rowset key
Versionstamp versionstamp;
std::string rowset_ref_count_key;
};

Expand Down Expand Up @@ -491,12 +492,8 @@ class InstanceRecycler {

// Recycle rowset meta and data, return 0 for success otherwise error
//
// Both recycle_rowset_key and non_versioned_rowset_key will be removed in the same transaction.
//
// This function will decrease the rowset ref count and remove the rowset meta and data if the ref count is 1.
int recycle_rowset_meta_and_data(std::string_view recycle_rowset_key,
const RowsetMetaCloudPB& rowset_meta,
std::string_view non_versioned_rowset_key = "");
int recycle_rowset_meta_and_data(const RowsetDeleteTask& task);

// Classify rowset task by ref_count, return 0 to add to batch delete, 1 if handled (ref>1), -1 on error
int classify_rowset_task_by_ref_count(RowsetDeleteTask& task,
Expand Down
Loading