Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -819,19 +819,36 @@ int InstanceRecycler::recycle_deleted_instance() {
<< "s, instance_id=" << instance_id_;
};

// Step 1: Recycle versioned rowsets in recycle space (already marked for deletion)
// Step 1: Recycle tmp rowsets (contains ref count but txn is not committed)
auto recycle_tmp_rowsets_with_mark_delete_enabled = [&]() -> int {
int res = recycle_tmp_rowsets();
if (res == 0 && config::enable_mark_delete_rowset_before_recycle) {
// If mark_delete_rowset_before_recycle is enabled, we will mark delete rowsets before recycling them,
// so we need to recycle tmp rowsets again to make sure all rowsets in recycle space are marked for
// deletion, otherwise we may meet some corner cases that some rowsets are not marked for deletion
// and cannot be recycled.
res = recycle_tmp_rowsets();
}
return res;
};
if (recycle_tmp_rowsets_with_mark_delete_enabled() != 0) {
LOG_WARNING("failed to recycle tmp rowsets").tag("instance_id", instance_id_);
return -1;
}

// Step 2: Recycle versioned rowsets in recycle space (already marked for deletion)
if (recycle_versioned_rowsets() != 0) {
LOG_WARNING("failed to recycle versioned rowsets").tag("instance_id", instance_id_);
return -1;
}

// Step 2: Recycle operation logs (can recycle logs not referenced by snapshots)
// Step 3: Recycle operation logs (can recycle logs not referenced by snapshots)
if (recycle_operation_logs() != 0) {
LOG_WARNING("failed to recycle operation logs").tag("instance_id", instance_id_);
return -1;
}

// Step 3: Check if there are still cluster snapshots
// Step 4: Check if there are still cluster snapshots
bool has_snapshots = false;
if (has_cluster_snapshots(&has_snapshots) != 0) {
LOG(WARNING) << "check instance cluster snapshots failed, instance_id=" << instance_id_;
Expand Down
90 changes: 90 additions & 0 deletions cloud/test/recycler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3326,6 +3326,96 @@ TEST(RecyclerTest, recycle_deleted_instance) {
}
}

// Regression test: if commit_rowset is called but the txn is never committed,
// the rowset stays in meta_rowset_tmp_key with data_rowset_ref_count_key=1.
// recycle_deleted_instance() must call recycle_tmp_rowsets() first so that
// the orphan ref_count is cleaned up before recycle_ref_rowsets() runs,
// otherwise the instance is never fully deleted and data files are leaked.
TEST(RecyclerTest, recycle_deleted_instance_with_orphan_tmp_rowset) {
config::retention_seconds = 0;
config::force_immediate_recycle = true;
DORIS_CLOUD_DEFER {
config::force_immediate_recycle = false;
};

auto txn_kv = std::dynamic_pointer_cast<TxnKv>(std::make_shared<MemTxnKv>());
ASSERT_NE(txn_kv.get(), nullptr);
ASSERT_EQ(txn_kv->init(), 0);

// Create instance with multi-version read/write and snapshot support
InstanceInfoPB instance_info;
instance_info.set_instance_id(instance_id);
instance_info.set_multi_version_status(MultiVersionStatus::MULTI_VERSION_READ_WRITE);
instance_info.set_snapshot_switch_status(SnapshotSwitchStatus::SNAPSHOT_SWITCH_ON);
auto* obj_info = instance_info.add_obj_info();
obj_info->set_id("orphan_tmp_rowset_test");

// Write instance info to FDB (required by OperationLogRecycleChecker::init())
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::string key = instance_key({instance_id});
std::string val = instance_info.SerializeAsString();
txn->put(key, val);
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}

InstanceRecycler recycler(txn_kv, instance_info, thread_group,
std::make_shared<TxnLazyCommitter>(txn_kv));
ASSERT_EQ(recycler.init(), 0);
auto accessor = recycler.accessor_map_.begin()->second;

constexpr int64_t tablet_id = 20001;
constexpr int64_t index_id = 20002;
constexpr int64_t txn_id = 888888;

// Simulate commit_rowset: write meta_rowset_tmp_key + increment ref_count,
// but do NOT commit the txn (no meta_rowset_key, no operation log).
doris::TabletSchemaCloudPB schema;
schema.set_schema_version(0);
auto rowset = create_rowset("orphan_tmp_rowset_test", tablet_id, index_id, 2, schema, txn_id);
ASSERT_EQ(0, create_tmp_rowset(txn_kv.get(), accessor.get(), rowset, false));

// Verify the data file exists
{
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_all(&list_iter));
ASSERT_TRUE(list_iter->has_next());
}

// Verify the ref_count key exists
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::unique_ptr<RangeGetIterator> it;
auto begin_key = versioned::data_rowset_ref_count_key({instance_id, 0, ""});
auto end_key = versioned::data_rowset_ref_count_key({instance_id, INT64_MAX, ""});
ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
ASSERT_EQ(it->size(), 1);
}

// Recycle deleted instance
ASSERT_EQ(0, recycler.recycle_deleted_instance());

// All data files must be deleted
{
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_all(&list_iter));
ASSERT_FALSE(list_iter->has_next());
}

// All ref_count keys must be cleaned up
{
std::unique_ptr<Transaction> txn;
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
std::unique_ptr<RangeGetIterator> it;
auto begin_key = versioned::data_rowset_ref_count_key({instance_id, 0, ""});
auto end_key = versioned::data_rowset_ref_count_key({instance_id, INT64_MAX, ""});
ASSERT_EQ(txn->get(begin_key, end_key, &it), TxnErrorCode::TXN_OK);
ASSERT_EQ(it->size(), 0);
}
}

TEST(RecyclerTest, multi_recycler) {
config::recycle_concurrency = 2;
config::recycle_interval_seconds = 10;
Expand Down