From 1be93edb0e6a4b397af8e7b734e3733f07be5923 Mon Sep 17 00:00:00 2001 From: walter Date: Mon, 9 Mar 2026 21:36:20 +0800 Subject: [PATCH] [fix](recycler) use document_remove to delete versioned rowset meta (#61157) --- cloud/src/meta-store/keys.cpp | 9 --- cloud/src/meta-store/keys.h | 7 -- cloud/src/recycler/recycler.cpp | 124 +++++++++++--------------------- cloud/src/recycler/recycler.h | 7 +- 4 files changed, 44 insertions(+), 103 deletions(-) diff --git a/cloud/src/meta-store/keys.cpp b/cloud/src/meta-store/keys.cpp index 88073ae3a0081c..cffe5154bfff4b 100644 --- a/cloud/src/meta-store/keys.cpp +++ b/cloud/src/meta-store/keys.cpp @@ -825,15 +825,6 @@ void data_rowset_ref_count_key(const DataRowsetRefCountKeyInfo& in, std::string* encode_bytes(std::get<2>(in), out); // rowset_id } -void meta_rowset_key(const MetaRowsetKeyInfo& in, std::string* out) { - out->push_back(CLOUD_VERSIONED_KEY_SPACE03); - encode_bytes(META_KEY_PREFIX, out); // "meta" - encode_bytes(std::get<0>(in), out); // instance_id - encode_bytes(META_KEY_INFIX_ROWSET, out); // "rowset" - encode_int64(std::get<1>(in), out); // tablet_id - encode_bytes(std::get<2>(in), out); // rowset_id -} - //============================================================================== // Snapshot keys //============================================================================== diff --git a/cloud/src/meta-store/keys.h b/cloud/src/meta-store/keys.h index 53a24939cdee4d..4fc48c652fc661 100644 --- a/cloud/src/meta-store/keys.h +++ b/cloud/src/meta-store/keys.h @@ -322,10 +322,6 @@ using MetaDeleteBitmapInfo = BasicKeyInfo<__LINE__ , std::tuple>; -// 0x03 "meta" ${instance_id} "rowset" ${tablet_id} ${rowset_id} -> RowsetMetaPB -// 0:instance_id 1:tablet_id 2:rowset_id -using MetaRowsetKeyInfo = BasicKeyInfo<__LINE__, std::tuple>; - // 0x03 "snapshot" ${instance_id} "full" ${timestamp} -> SnapshotPB // 0:instance_id using SnapshotFullKeyInfo = BasicKeyInfo<__LINE__, std::tuple>; @@ -533,9 +529,6 @@ static inline std::string meta_delete_bitmap_key(const MetaDeleteBitmapInfo& in) void data_rowset_ref_count_key(const DataRowsetRefCountKeyInfo& in, std::string* out); static inline std::string data_rowset_ref_count_key(const DataRowsetRefCountKeyInfo& in) { std::string s; data_rowset_ref_count_key(in, &s); return s; } -void meta_rowset_key(const MetaRowsetKeyInfo& in, std::string* out); -static inline std::string meta_rowset_key(const MetaRowsetKeyInfo& in) { std::string s; meta_rowset_key(in, &s); return s; } - void snapshot_full_key(const SnapshotFullKeyInfo& in, std::string* out); static inline std::string snapshot_full_key(const SnapshotFullKeyInfo& in) { std::string s; snapshot_full_key(in, &s); return s; } diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 60e6a763312d38..0121c6b1cc070a 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -1617,41 +1617,6 @@ int64_t calculate_restore_job_expired_time( return final_expiration; } -int get_meta_rowset_key(Transaction* txn, const std::string& instance_id, int64_t tablet_id, - const std::string& rowset_id, int64_t start_version, int64_t end_version, - bool load_key, bool* exist) { - std::string key = - load_key ? versioned::meta_rowset_load_key({instance_id, tablet_id, end_version}) - : versioned::meta_rowset_compact_key({instance_id, tablet_id, end_version}); - RowsetMetaCloudPB rowset_meta; - Versionstamp version; - TxnErrorCode err = versioned::document_get(txn, key, &rowset_meta, &version); - if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { - VLOG_DEBUG << "not found load or compact meta_rowset_key." - << " rowset_id=" << rowset_id << " start_version=" << start_version - << " end_version=" << end_version << " key=" << hex(key); - } else if (err != TxnErrorCode::TXN_OK) { - LOG_INFO("failed to get load or compact meta_rowset_key.") - .tag("rowset_id", rowset_id) - .tag("start_version", start_version) - .tag("end_version", end_version) - .tag("key", hex(key)) - .tag("error_code", err); - return -1; - } else if (rowset_meta.rowset_id_v2() == rowset_id) { - *exist = true; - VLOG_DEBUG << "found load or compact meta_rowset_key." - << " rowset_id=" << rowset_id << " start_version=" << start_version - << " end_version=" << end_version << " key=" << hex(key); - } else { - VLOG_DEBUG << "rowset_id does not match when find load or compact meta_rowset_key." - << " rowset_id=" << rowset_id << " start_version=" << start_version - << " end_version=" << end_version << " key=" << hex(key) - << " found_rowset_id=" << rowset_meta.rowset_id_v2(); - } - return 0; -} - int InstanceRecycler::abort_txn_for_related_rowset(int64_t txn_id) { AbortTxnRequest req; TxnInfoPB txn_info; @@ -4412,29 +4377,18 @@ int InstanceRecycler::recycle_versioned_tablet(int64_t tablet_id, }; std::vector all_tasks; - - auto create_delete_task = [this](const RowsetMetaCloudPB& rs_meta, std::string_view recycle_key, - std::string_view non_versioned_rowset_key = - "") -> RowsetDeleteTask { - RowsetDeleteTask task; - task.rowset_meta = rs_meta; - task.recycle_rowset_key = std::string(recycle_key); - task.non_versioned_rowset_key = std::string(non_versioned_rowset_key); - task.versioned_rowset_key = versioned::meta_rowset_key( - {instance_id_, rs_meta.tablet_id(), rs_meta.rowset_id_v2()}); - return task; - }; - for (const auto& [rs_meta, versionstamp] : load_rowset_metas) { update_rowset_stats(rs_meta); // Version 0-1 rowset has no resource_id and no actual data files, // but still needs ref_count key cleanup, so we add it to all_tasks. // It will be filtered out in Phase 2 when building rowsets_to_delete. - std::string rowset_load_key = + RowsetDeleteTask task; + task.rowset_meta = rs_meta; + task.versioned_rowset_key = versioned::meta_rowset_load_key({instance_id_, tablet_id, rs_meta.end_version()}); - std::string rowset_key = meta_rowset_key({instance_id_, tablet_id, rs_meta.end_version()}); - RowsetDeleteTask task = create_delete_task( - rs_meta, encode_versioned_key(rowset_load_key, versionstamp), rowset_key); + task.non_versioned_rowset_key = + meta_rowset_key({instance_id_, tablet_id, rs_meta.end_version()}); + task.versionstamp = versionstamp; all_tasks.push_back(std::move(task)); } @@ -4443,11 +4397,13 @@ int InstanceRecycler::recycle_versioned_tablet(int64_t tablet_id, // Version 0-1 rowset has no resource_id and no actual data files, // but still needs ref_count key cleanup, so we add it to all_tasks. // It will be filtered out in Phase 2 when building rowsets_to_delete. - std::string rowset_compact_key = versioned::meta_rowset_compact_key( + RowsetDeleteTask task; + task.rowset_meta = rs_meta; + task.versioned_rowset_key = versioned::meta_rowset_compact_key( {instance_id_, tablet_id, rs_meta.end_version()}); - std::string rowset_key = meta_rowset_key({instance_id_, tablet_id, rs_meta.end_version()}); - RowsetDeleteTask task = create_delete_task( - rs_meta, encode_versioned_key(rowset_compact_key, versionstamp), rowset_key); + task.non_versioned_rowset_key = + meta_rowset_key({instance_id_, tablet_id, rs_meta.end_version()}); + task.versionstamp = versionstamp; all_tasks.push_back(std::move(task)); } @@ -4493,7 +4449,9 @@ int InstanceRecycler::recycle_versioned_tablet(int64_t tablet_id, // Version 0-1 rowset has no resource_id and no actual data files, // but still needs ref_count key cleanup, so we add it to all_tasks. // It will be filtered out in Phase 2 when building rowsets_to_delete. - RowsetDeleteTask task = create_delete_task(rowset_meta, k); + RowsetDeleteTask task; + task.rowset_meta = rowset_meta; + task.recycle_rowset_key = k; all_tasks.push_back(std::move(task)); } return 0; @@ -4517,8 +4475,7 @@ int InstanceRecycler::recycle_versioned_tablet(int64_t tablet_id, .tag("tablet_id", tablet_id) .tag("rowset_id", task.rowset_meta.rowset_id_v2()); concurrent_delete_executor.add([this, t = std::move(task)]() mutable { - return recycle_rowset_meta_and_data(t.recycle_rowset_key, t.rowset_meta, - t.non_versioned_rowset_key); + return recycle_rowset_meta_and_data(t); }); } } @@ -5335,7 +5292,11 @@ int InstanceRecycler::recycle_versioned_rowsets() { bool is_compacted = rowset.type() == RecycleRowsetPB::COMPACT; worker_pool->submit( [&, is_compacted, k = std::string(k), rowset_meta = std::move(*rowset_meta)]() { - if (recycle_rowset_meta_and_data(k, rowset_meta) != 0) { + // The load & compact rowset keys are recycled during recycling operation logs. + RowsetDeleteTask task; + task.rowset_meta = rowset_meta; + task.recycle_rowset_key = k; + if (recycle_rowset_meta_and_data(task) != 0) { return; } num_compacted += is_compacted; @@ -5382,10 +5343,9 @@ int InstanceRecycler::recycle_versioned_rowsets() { return ret; } -int InstanceRecycler::recycle_rowset_meta_and_data(std::string_view recycle_rowset_key, - const RowsetMetaCloudPB& rowset_meta, - std::string_view non_versioned_rowset_key) { +int InstanceRecycler::recycle_rowset_meta_and_data(const RowsetDeleteTask& task) { constexpr int MAX_RETRY = 10; + const RowsetMetaCloudPB& rowset_meta = task.rowset_meta; int64_t tablet_id = rowset_meta.tablet_id(); const std::string& rowset_id = rowset_meta.rowset_id_v2(); std::string_view reference_instance_id = instance_id_; @@ -5395,7 +5355,7 @@ int InstanceRecycler::recycle_rowset_meta_and_data(std::string_view recycle_rows AnnotateTag tablet_id_tag("tablet_id", tablet_id); AnnotateTag rowset_id_tag("rowset_id", rowset_id); - AnnotateTag rowset_key_tag("recycle_rowset_key", hex(recycle_rowset_key)); + AnnotateTag rowset_key_tag("recycle_rowset_key", hex(task.recycle_rowset_key)); AnnotateTag instance_id_tag("instance_id", instance_id_); AnnotateTag ref_instance_id_tag("ref_instance_id", reference_instance_id); for (int i = 0; i < MAX_RETRY; ++i) { @@ -5460,13 +5420,6 @@ int InstanceRecycler::recycle_rowset_meta_and_data(std::string_view recycle_rows LOG_INFO("remove versioned delete bitmap kv") .tag("begin", hex(versioned_dbm_start_key)) .tag("end", hex(versioned_dbm_end_key)); - - std::string meta_rowset_key_begin = - versioned::meta_rowset_key({reference_instance_id, tablet_id, rowset_id}); - std::string meta_rowset_key_end = meta_rowset_key_begin; - encode_int64(INT64_MAX, &meta_rowset_key_end); - txn->remove(meta_rowset_key_begin, meta_rowset_key_end); - LOG_INFO("remove meta rowset key").tag("key", hex(meta_rowset_key_begin)); } else { // Decrease the rowset ref count. // @@ -5479,13 +5432,22 @@ int InstanceRecycler::recycle_rowset_meta_and_data(std::string_view recycle_rows .tag("ref_count_key", hex(rowset_ref_count_key)); } - if (!recycle_rowset_key.empty()) { // empty when recycle ref rowsets for deleted instance - txn->remove(recycle_rowset_key); - LOG_INFO("remove recycle rowset key").tag("key", hex(recycle_rowset_key)); + if (!task.versioned_rowset_key.empty()) { + versioned::document_remove(txn.get(), task.versioned_rowset_key, + task.versionstamp); + LOG_INFO("remove versioned meta rowset key").tag("key", hex(task.versioned_rowset_key)); } - if (!non_versioned_rowset_key.empty()) { - txn->remove(non_versioned_rowset_key); - LOG_INFO("remove non versioned rowset key").tag("key", hex(non_versioned_rowset_key)); + + if (!task.non_versioned_rowset_key.empty()) { + txn->remove(task.non_versioned_rowset_key); + LOG_INFO("remove non versioned rowset key") + .tag("key", hex(task.non_versioned_rowset_key)); + } + + // empty when recycle ref rowsets for deleted instance + if (!task.recycle_rowset_key.empty()) { + txn->remove(task.recycle_rowset_key); + LOG_INFO("remove recycle rowset key").tag("key", hex(task.recycle_rowset_key)); } err = txn->commit(); @@ -7490,15 +7452,13 @@ int InstanceRecycler::cleanup_rowset_metadata(const std::vectorremove(task.versioned_rowset_key, versioned_rowset_key_end); + versioned::document_remove( + txn.get(), task.versioned_rowset_key, task.versionstamp); LOG_INFO("remove versioned meta rowset key in cleanup phase") .tag("instance_id", instance_id_) .tag("tablet_id", tablet_id) .tag("rowset_id", rowset_id) - .tag("begin", hex(task.versioned_rowset_key)) - .tag("end", hex(versioned_rowset_key_end)); + .tag("key_prefix", hex(task.versioned_rowset_key)); } if (!task.non_versioned_rowset_key.empty()) { diff --git a/cloud/src/recycler/recycler.h b/cloud/src/recycler/recycler.h index 5280c67653f6e0..6598a44aff7197 100644 --- a/cloud/src/recycler/recycler.h +++ b/cloud/src/recycler/recycler.h @@ -140,6 +140,7 @@ struct RowsetDeleteTask { std::string recycle_rowset_key; // Primary key marking "pending recycle" std::string non_versioned_rowset_key; // Legacy non-versioned rowset meta key std::string versioned_rowset_key; // Versioned meta rowset key + Versionstamp versionstamp; std::string rowset_ref_count_key; }; @@ -491,12 +492,8 @@ class InstanceRecycler { // Recycle rowset meta and data, return 0 for success otherwise error // - // Both recycle_rowset_key and non_versioned_rowset_key will be removed in the same transaction. - // // This function will decrease the rowset ref count and remove the rowset meta and data if the ref count is 1. - int recycle_rowset_meta_and_data(std::string_view recycle_rowset_key, - const RowsetMetaCloudPB& rowset_meta, - std::string_view non_versioned_rowset_key = ""); + int recycle_rowset_meta_and_data(const RowsetDeleteTask& task); // Classify rowset task by ref_count, return 0 to add to batch delete, 1 if handled (ref>1), -1 on error int classify_rowset_task_by_ref_count(RowsetDeleteTask& task,