diff --git a/cloud/src/recycler/recycler.cpp b/cloud/src/recycler/recycler.cpp index 60e6a763312d38..b65e226efe076e 100644 --- a/cloud/src/recycler/recycler.cpp +++ b/cloud/src/recycler/recycler.cpp @@ -819,19 +819,36 @@ int InstanceRecycler::recycle_deleted_instance() { << "s, instance_id=" << instance_id_; }; - // Step 1: Recycle versioned rowsets in recycle space (already marked for deletion) + // Step 1: Recycle tmp rowsets (contains ref count but txn is not committed) + auto recycle_tmp_rowsets_with_mark_delete_enabled = [&]() -> int { + int res = recycle_tmp_rowsets(); + if (res == 0 && config::enable_mark_delete_rowset_before_recycle) { + // If mark_delete_rowset_before_recycle is enabled, we will mark delete rowsets before recycling them, + // so we need to recycle tmp rowsets again to make sure all rowsets in recycle space are marked for + // deletion, otherwise we may meet some corner cases that some rowsets are not marked for deletion + // and cannot be recycled. + res = recycle_tmp_rowsets(); + } + return res; + }; + if (recycle_tmp_rowsets_with_mark_delete_enabled() != 0) { + LOG_WARNING("failed to recycle tmp rowsets").tag("instance_id", instance_id_); + return -1; + } + + // Step 2: Recycle versioned rowsets in recycle space (already marked for deletion) if (recycle_versioned_rowsets() != 0) { LOG_WARNING("failed to recycle versioned rowsets").tag("instance_id", instance_id_); return -1; } - // Step 2: Recycle operation logs (can recycle logs not referenced by snapshots) + // Step 3: Recycle operation logs (can recycle logs not referenced by snapshots) if (recycle_operation_logs() != 0) { LOG_WARNING("failed to recycle operation logs").tag("instance_id", instance_id_); return -1; } - // Step 3: Check if there are still cluster snapshots + // Step 4: Check if there are still cluster snapshots bool has_snapshots = false; if (has_cluster_snapshots(&has_snapshots) != 0) { LOG(WARNING) << "check instance cluster snapshots failed, instance_id=" << instance_id_; diff --git a/cloud/test/recycler_test.cpp b/cloud/test/recycler_test.cpp index 976ca7a18d9a32..2f3981b439ba12 100644 --- a/cloud/test/recycler_test.cpp +++ b/cloud/test/recycler_test.cpp @@ -3326,6 +3326,96 @@ TEST(RecyclerTest, recycle_deleted_instance) { } } +// Regression test: if commit_rowset is called but the txn is never committed, +// the rowset stays in meta_rowset_tmp_key with data_rowset_ref_count_key=1. +// recycle_deleted_instance() must call recycle_tmp_rowsets() first so that +// the orphan ref_count is cleaned up before recycle_ref_rowsets() runs, +// otherwise the instance is never fully deleted and data files are leaked. +TEST(RecyclerTest, recycle_deleted_instance_with_orphan_tmp_rowset) { + config::retention_seconds = 0; + config::force_immediate_recycle = true; + DORIS_CLOUD_DEFER { + config::force_immediate_recycle = false; + }; + + auto txn_kv = std::dynamic_pointer_cast(std::make_shared()); + ASSERT_NE(txn_kv.get(), nullptr); + ASSERT_EQ(txn_kv->init(), 0); + + // Create instance with multi-version read/write and snapshot support + InstanceInfoPB instance_info; + instance_info.set_instance_id(instance_id); + instance_info.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_READ_WRITE); + instance_info.set_snapshot_switch_status(SnapshotSwitchStatus::SNAPSHOT_SWITCH_ON); + auto* obj_info = instance_info.add_obj_info(); + obj_info->set_id("orphan_tmp_rowset_test"); + + // Write instance info to FDB (required by OperationLogRecycleChecker::init()) + { + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::string key = instance_key({instance_id}); + std::string val = instance_info.SerializeAsString(); + txn->put(key, val); + ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK); + } + + InstanceRecycler recycler(txn_kv, instance_info, thread_group, + std::make_shared(txn_kv)); + ASSERT_EQ(recycler.init(), 0); + auto accessor = recycler.accessor_map_.begin()->second; + + constexpr int64_t tablet_id = 20001; + constexpr int64_t index_id = 20002; + constexpr int64_t txn_id = 888888; + + // Simulate commit_rowset: write meta_rowset_tmp_key + increment ref_count, + // but do NOT commit the txn (no meta_rowset_key, no operation log). + doris::TabletSchemaCloudPB schema; + schema.set_schema_version(0); + auto rowset = create_rowset("orphan_tmp_rowset_test", tablet_id, index_id, 2, schema, txn_id); + ASSERT_EQ(0, create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, false)); + + // Verify the data file exists + { + std::unique_ptr list_iter; + ASSERT_EQ(0, accessor->list_all(&list_iter)); + ASSERT_TRUE(list_iter->has_next()); + } + + // Verify the ref_count key exists + { + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::unique_ptr it; + auto begin_key = versioned::data_rowset_ref_count_key({instance_id, 0, ""}); + auto end_key = versioned::data_rowset_ref_count_key({instance_id, INT64_MAX, ""}); + ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); + ASSERT_EQ(it->size(), 1); + } + + // Recycle deleted instance + ASSERT_EQ(0, recycler.recycle_deleted_instance()); + + // All data files must be deleted + { + std::unique_ptr list_iter; + ASSERT_EQ(0, accessor->list_all(&list_iter)); + ASSERT_FALSE(list_iter->has_next()); + } + + // All ref_count keys must be cleaned up + { + std::unique_ptr txn; + ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK); + std::unique_ptr it; + auto begin_key = versioned::data_rowset_ref_count_key({instance_id, 0, ""}); + auto end_key = versioned::data_rowset_ref_count_key({instance_id, INT64_MAX, ""}); + ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK); + ASSERT_EQ(it->size(), 0); + } +} + TEST(RecyclerTest, multi_recycler) { config::recycle_concurrency = 2; config::recycle_interval_seconds = 10;