diff --git a/cloud/src/common/config.h b/cloud/src/common/config.h index 214f759999566f..243ca05c680318 100644 --- a/cloud/src/common/config.h +++ b/cloud/src/common/config.h @@ -320,6 +320,10 @@ CONF_Int32(parallel_txn_lazy_commit_num_threads, "0"); // hardware concurrency i CONF_mInt64(txn_lazy_max_rowsets_per_batch, "1000"); CONF_mBool(txn_lazy_commit_shuffle_partitions, "true"); CONF_Int64(txn_lazy_commit_shuffle_seed, "0"); // 0 means generate a random seed +// WARNING: All meta-servers MUST be upgraded before changing this to true. +// When enabled, defer deleting pending delete bitmaps until lazy commit completes. +// This reduces contention during transaction commit by extending delete bitmap locks. +CONF_mBool(txn_lazy_commit_defer_deleting_pending_delete_bitmaps, "false"); // max TabletIndexPB num for batch get CONF_Int32(max_tablet_index_num_per_batch, "1000"); CONF_Int32(max_restore_job_rowsets_per_batch, "1000"); diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index adddd83744f0e8..863f3f7dea243a 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -1234,6 +1234,64 @@ void update_tablet_stats(const StatsTabletKeyInfo& info, const TabletStats& stat } } +// process mow table, check lock and update lock timeout +void process_mow_when_commit_txn_deferred( + const CommitTxnRequest* request, const std::string& instance_id, MetaServiceCode& code, + std::string& msg, std::unique_ptr& txn, + std::unordered_map>& table_id_tablet_ids) { + int64_t txn_id = request->txn_id(); + std::stringstream ss; + std::vector lock_keys; + lock_keys.reserve(request->mow_table_ids().size()); + for (auto table_id : request->mow_table_ids()) { + lock_keys.push_back(meta_delete_bitmap_update_lock_key({instance_id, table_id, -1})); + } + std::vector> lock_values; + TxnErrorCode err = txn->batch_get(&lock_values, lock_keys); + if (err != TxnErrorCode::TXN_OK) { + ss << "failed to get delete bitmap update lock key info, instance_id=" << instance_id + << " err=" << err; + msg = ss.str(); + code = cast_as(err); + LOG(WARNING) << msg << " txn_id=" << txn_id; + return; + } + size_t total_locks = lock_keys.size(); + for (size_t i = 0; i < total_locks; i++) { + int64_t table_id = request->mow_table_ids(i); + // When the key does not exist, it means the lock has been acquired + // by another transaction and successfully committed. + if (!lock_values[i].has_value()) { + ss << "get delete bitmap update lock info, lock is expired" + << " table_id=" << table_id << " key=" << hex(lock_keys[i]) << " txn_id=" << txn_id; + code = MetaServiceCode::LOCK_EXPIRED; + msg = ss.str(); + LOG(WARNING) << msg << " txn_id=" << txn_id; + return; + } + + DeleteBitmapUpdateLockPB lock_info; + if (!lock_info.ParseFromString(lock_values[i].value())) [[unlikely]] { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + msg = "failed to parse DeleteBitmapUpdateLockPB"; + LOG(WARNING) << msg << " txn_id=" << txn_id; + return; + } + if (lock_info.lock_id() != request->txn_id()) { + ss << "lock is expired, locked by lock_id=" << lock_info.lock_id(); + msg = ss.str(); + code = MetaServiceCode::LOCK_EXPIRED; + return; + } + lock_info.set_expiration(std::numeric_limits::max()); + txn->put(lock_keys[i], lock_info.SerializeAsString()); + LOG(INFO) << "refresh delete bitmap lock, lock_key=" << hex(lock_keys[i]) + << " table_id=" << table_id << " txn_id=" << txn_id; + } + lock_keys.clear(); + lock_values.clear(); +} + // process mow table, check lock and remove pending key void process_mow_when_commit_txn( const CommitTxnRequest* request, const std::string& instance_id, MetaServiceCode& code, @@ -2396,10 +2454,24 @@ void MetaServiceImpl::commit_txn_eventually( for (auto& [tablet_id, tablet_idx] : tablet_ids) { table_id_tablet_ids[tablet_idx.table_id()].push_back(tablet_id); } - process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids); - if (code != MetaServiceCode::OK) { - LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code; - return; + if (config::txn_lazy_commit_defer_deleting_pending_delete_bitmaps) { + txn_info.clear_table_ids(); + for (auto& [table_id, _] : table_id_tablet_ids) { + txn_info.add_table_ids(table_id); + } + txn_info.set_defer_deleting_pending_delete_bitmaps(true); + process_mow_when_commit_txn_deferred(request, instance_id, code, msg, txn, + table_id_tablet_ids); + if (code != MetaServiceCode::OK) { + LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code; + return; + } + } else { + process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids); + if (code != MetaServiceCode::OK) { + LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code; + return; + } } // table_id -> version, for response diff --git a/cloud/src/meta-service/txn_lazy_committer.cpp b/cloud/src/meta-service/txn_lazy_committer.cpp index 115161693b8aad..130745e5e0ba6a 100644 --- a/cloud/src/meta-service/txn_lazy_committer.cpp +++ b/cloud/src/meta-service/txn_lazy_committer.cpp @@ -213,7 +213,8 @@ void convert_tmp_rowsets( MetaServiceCode& code, std::string& msg, int64_t db_id, std::vector>& tmp_rowsets_meta, std::map& tablet_ids, bool is_versioned_write, - bool is_versioned_read, Versionstamp versionstamp, ResourceManager* resource_mgr) { + bool is_versioned_read, Versionstamp versionstamp, ResourceManager* resource_mgr, + bool defer_deleting_pending_delete_bitmaps) { std::stringstream ss; std::unique_ptr txn; TxnErrorCode err = txn_kv->create_txn(&txn); @@ -483,6 +484,15 @@ void convert_tmp_rowsets( } } + if (defer_deleting_pending_delete_bitmaps) { + for (auto& [tablet_id, _] : tablet_stats) { + std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id}); + txn->remove(pending_key); + LOG(INFO) << "remove delete bitmap pending key, pending_key=" << hex(pending_key) + << " txn_id=" << txn_id; + } + } + TEST_SYNC_POINT_RETURN_WITH_VOID("convert_tmp_rowsets::before_commit", &code); err = txn->commit(); if (err != TxnErrorCode::TXN_OK) { @@ -495,7 +505,7 @@ void convert_tmp_rowsets( void make_committed_txn_visible(const std::string& instance_id, int64_t db_id, int64_t txn_id, std::shared_ptr txn_kv, MetaServiceCode& code, - std::string& msg) { + std::string& msg, bool defer_deleting_pending_delete_bitmaps) { // 1. visible txn info // 2. remove running key and put recycle txn key @@ -551,6 +561,35 @@ void make_committed_txn_visible(const std::string& instance_id, int64_t db_id, i LOG(INFO) << "remove running_key=" << hex(running_key) << " txn_id=" << txn_id; txn->remove(running_key); + // Remove delete bitmap locks if deferring deletion + if (defer_deleting_pending_delete_bitmaps) { + for (int64_t table_id : txn_info.table_ids()) { + std::string lock_key = + meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); + // Read the lock first to check if it still belongs to the current txn + std::string lock_val; + TxnErrorCode err = txn->get(lock_key, &lock_val); + if (err == TxnErrorCode::TXN_OK) { + DeleteBitmapUpdateLockPB lock_info; + if (lock_info.ParseFromString(lock_val)) { + // Only remove the lock if it still belongs to the current txn + if (lock_info.lock_id() == txn_id) { + txn->remove(lock_key); + LOG(INFO) << "remove delete bitmap lock, lock_key=" << hex(lock_key) + << " table_id=" << table_id << " txn_id=" << txn_id; + } else { + LOG(WARNING) << "delete bitmap lock is held by another txn, " + << "lock_key=" << hex(lock_key) << " table_id=" << table_id + << " expected_txn_id=" << txn_id + << " actual_lock_id=" << lock_info.lock_id(); + } + } + } else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) { + LOG(WARNING) << "failed to get delete bitmap lock, lock_key=" << hex(lock_key) + << " table_id=" << table_id << " err=" << err; + } + } + } // The recycle txn pb will be written when recycle the commit txn log, // if the txn versioned write is enabled. if (!txn_info.versioned_write()) { @@ -635,6 +674,7 @@ void TxnLazyCommitTask::commit() { return; } + bool defer_deleting_pending_delete_bitmaps = txn_info.defer_deleting_pending_delete_bitmaps(); bool is_versioned_write = txn_info.versioned_write(); bool is_versioned_read = txn_info.versioned_read(); CloneChainReader meta_reader(instance_id_, txn_kv_.get(), @@ -690,7 +730,8 @@ void TxnLazyCommitTask::commit() { executor.add([&, partition_id, this]() { return commit_partition(db_id, partition_id, partition_to_tmp_rowset_metas.at(partition_id), - is_versioned_read, is_versioned_write); + is_versioned_read, is_versioned_write, + defer_deleting_pending_delete_bitmaps); }); } bool finished = false; @@ -711,7 +752,8 @@ void TxnLazyCommitTask::commit() { for (int64_t partition_id : partition_ids) { std::tie(code_, msg_) = commit_partition( db_id, partition_id, partition_to_tmp_rowset_metas[partition_id], - is_versioned_read, is_versioned_write); + is_versioned_read, is_versioned_write, + defer_deleting_pending_delete_bitmaps); if (code_ != MetaServiceCode::OK) break; } } @@ -720,7 +762,8 @@ void TxnLazyCommitTask::commit() { LOG(WARNING) << "txn_id=" << txn_id_ << " code=" << code_ << " msg=" << msg_; break; } - make_committed_txn_visible(instance_id_, db_id, txn_id_, txn_kv_, code_, msg_); + make_committed_txn_visible(instance_id_, db_id, txn_id_, txn_kv_, code_, msg_, + defer_deleting_pending_delete_bitmaps); } while (false); } while (code_ == MetaServiceCode::KV_TXN_CONFLICT && retry_times++ < config::txn_store_retry_times); @@ -729,7 +772,8 @@ void TxnLazyCommitTask::commit() { std::pair TxnLazyCommitTask::commit_partition( int64_t db_id, int64_t partition_id, const std::vector>& tmp_rowset_metas, - bool is_versioned_read, bool is_versioned_write) { + bool is_versioned_read, bool is_versioned_write, + bool defer_deleting_pending_delete_bitmaps) { std::stringstream ss; CloneChainReader meta_reader(instance_id_, txn_kv_.get(), txn_lazy_committer_->resource_manager().get()); @@ -802,7 +846,8 @@ std::pair TxnLazyCommitTask::commit_partition( convert_tmp_rowsets(instance_id_, txn_id_, txn_kv_, code, msg, db_id, sub_partition_tmp_rowset_metas, tablet_ids, is_versioned_write, is_versioned_read, versionstamp, - txn_lazy_committer_->resource_manager().get()); + txn_lazy_committer_->resource_manager().get(), + defer_deleting_pending_delete_bitmaps); if (code != MetaServiceCode::OK) { return {code, msg}; } diff --git a/cloud/src/meta-service/txn_lazy_committer.h b/cloud/src/meta-service/txn_lazy_committer.h index 8ab62aff89fe35..048ff7cf3081bf 100644 --- a/cloud/src/meta-service/txn_lazy_committer.h +++ b/cloud/src/meta-service/txn_lazy_committer.h @@ -48,7 +48,8 @@ class TxnLazyCommitTask { std::pair commit_partition( int64_t db_id, int64_t partition_id, const std::vector>& tmp_rowset_metas, - bool is_versioned_write, bool is_versioned_read); + bool is_versioned_write, bool is_versioned_read, + bool defer_deleting_pending_delete_bitmaps); std::string instance_id_; int64_t txn_id_; diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index b60f330dcfeb4b..90412ce568884b 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -506,6 +506,10 @@ message TxnInfoPB { // TODO: There are more fields TBD optional bool versioned_write = 19; // versioned write, don't need to write RecycleTxnPB again optional bool versioned_read = 20; // whether to read versioned keys + // When enabled, defer deleting pending delete bitmaps until lazy commit completes. + // This is used for lazy commit to reduce contention during transaction commit. + // The delete bitmap locks will be extended and removed during lazy commit phase. + optional bool defer_deleting_pending_delete_bitmaps = 21; } // For check txn conflict and txn timeout