Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cloud/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,10 @@ CONF_Int32(parallel_txn_lazy_commit_num_threads, "0"); // hardware concurrency i
CONF_mInt64(txn_lazy_max_rowsets_per_batch, "1000");
CONF_mBool(txn_lazy_commit_shuffle_partitions, "true");
CONF_Int64(txn_lazy_commit_shuffle_seed, "0"); // 0 means generate a random seed
// WARNING: All meta-servers MUST be upgraded before changing this to true.
// When enabled, defer deleting pending delete bitmaps until lazy commit completes.
// This reduces contention during transaction commit by extending delete bitmap locks.
CONF_mBool(txn_lazy_commit_defer_deleting_pending_delete_bitmaps, "false");
// max TabletIndexPB num for batch get
CONF_Int32(max_tablet_index_num_per_batch, "1000");
CONF_Int32(max_restore_job_rowsets_per_batch, "1000");
Expand Down
80 changes: 76 additions & 4 deletions cloud/src/meta-service/meta_service_txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,64 @@ void update_tablet_stats(const StatsTabletKeyInfo& info, const TabletStats& stat
}
}

// process mow table, check lock and update lock timeout
void process_mow_when_commit_txn_deferred(
const CommitTxnRequest* request, const std::string& instance_id, MetaServiceCode& code,
std::string& msg, std::unique_ptr<Transaction>& txn,
std::unordered_map<int64_t, std::vector<int64_t>>& table_id_tablet_ids) {
int64_t txn_id = request->txn_id();
std::stringstream ss;
std::vector<std::string> lock_keys;
lock_keys.reserve(request->mow_table_ids().size());
for (auto table_id : request->mow_table_ids()) {
lock_keys.push_back(meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}));
}
std::vector<std::optional<std::string>> lock_values;
TxnErrorCode err = txn->batch_get(&lock_values, lock_keys);
if (err != TxnErrorCode::TXN_OK) {
ss << "failed to get delete bitmap update lock key info, instance_id=" << instance_id
<< " err=" << err;
msg = ss.str();
code = cast_as<ErrCategory::READ>(err);
LOG(WARNING) << msg << " txn_id=" << txn_id;
return;
}
size_t total_locks = lock_keys.size();
for (size_t i = 0; i < total_locks; i++) {
int64_t table_id = request->mow_table_ids(i);
// When the key does not exist, it means the lock has been acquired
// by another transaction and successfully committed.
if (!lock_values[i].has_value()) {
ss << "get delete bitmap update lock info, lock is expired"
<< " table_id=" << table_id << " key=" << hex(lock_keys[i]) << " txn_id=" << txn_id;
code = MetaServiceCode::LOCK_EXPIRED;
msg = ss.str();
LOG(WARNING) << msg << " txn_id=" << txn_id;
return;
}

DeleteBitmapUpdateLockPB lock_info;
if (!lock_info.ParseFromString(lock_values[i].value())) [[unlikely]] {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "failed to parse DeleteBitmapUpdateLockPB";
LOG(WARNING) << msg << " txn_id=" << txn_id;
return;
}
if (lock_info.lock_id() != request->txn_id()) {
ss << "lock is expired, locked by lock_id=" << lock_info.lock_id();
msg = ss.str();
code = MetaServiceCode::LOCK_EXPIRED;
return;
}
lock_info.set_expiration(std::numeric_limits<int64_t>::max());
txn->put(lock_keys[i], lock_info.SerializeAsString());
LOG(INFO) << "refresh delete bitmap lock, lock_key=" << hex(lock_keys[i])
<< " table_id=" << table_id << " txn_id=" << txn_id;
}
lock_keys.clear();
lock_values.clear();
}

// process mow table, check lock and remove pending key
void process_mow_when_commit_txn(
const CommitTxnRequest* request, const std::string& instance_id, MetaServiceCode& code,
Expand Down Expand Up @@ -2396,10 +2454,24 @@ void MetaServiceImpl::commit_txn_eventually(
for (auto& [tablet_id, tablet_idx] : tablet_ids) {
table_id_tablet_ids[tablet_idx.table_id()].push_back(tablet_id);
}
process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code;
return;
if (config::txn_lazy_commit_defer_deleting_pending_delete_bitmaps) {
txn_info.clear_table_ids();
for (auto& [table_id, _] : table_id_tablet_ids) {
txn_info.add_table_ids(table_id);
}
txn_info.set_defer_deleting_pending_delete_bitmaps(true);
process_mow_when_commit_txn_deferred(request, instance_id, code, msg, txn,
table_id_tablet_ids);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code;
return;
}
} else {
process_mow_when_commit_txn(request, instance_id, code, msg, txn, table_id_tablet_ids);
if (code != MetaServiceCode::OK) {
LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" << code;
return;
}
}

// table_id -> version, for response
Expand Down
59 changes: 52 additions & 7 deletions cloud/src/meta-service/txn_lazy_committer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ void convert_tmp_rowsets(
MetaServiceCode& code, std::string& msg, int64_t db_id,
std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta,
std::map<int64_t, TabletIndexPB>& tablet_ids, bool is_versioned_write,
bool is_versioned_read, Versionstamp versionstamp, ResourceManager* resource_mgr) {
bool is_versioned_read, Versionstamp versionstamp, ResourceManager* resource_mgr,
bool defer_deleting_pending_delete_bitmaps) {
std::stringstream ss;
std::unique_ptr<Transaction> txn;
TxnErrorCode err = txn_kv->create_txn(&txn);
Expand Down Expand Up @@ -483,6 +484,15 @@ void convert_tmp_rowsets(
}
}

if (defer_deleting_pending_delete_bitmaps) {
for (auto& [tablet_id, _] : tablet_stats) {
std::string pending_key = meta_pending_delete_bitmap_key({instance_id, tablet_id});
txn->remove(pending_key);
LOG(INFO) << "remove delete bitmap pending key, pending_key=" << hex(pending_key)
<< " txn_id=" << txn_id;
}
}

TEST_SYNC_POINT_RETURN_WITH_VOID("convert_tmp_rowsets::before_commit", &code);
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
Expand All @@ -495,7 +505,7 @@ void convert_tmp_rowsets(

void make_committed_txn_visible(const std::string& instance_id, int64_t db_id, int64_t txn_id,
std::shared_ptr<TxnKv> txn_kv, MetaServiceCode& code,
std::string& msg) {
std::string& msg, bool defer_deleting_pending_delete_bitmaps) {
// 1. visible txn info
// 2. remove running key and put recycle txn key

Expand Down Expand Up @@ -551,6 +561,35 @@ void make_committed_txn_visible(const std::string& instance_id, int64_t db_id, i
LOG(INFO) << "remove running_key=" << hex(running_key) << " txn_id=" << txn_id;
txn->remove(running_key);

// Remove delete bitmap locks if deferring deletion
if (defer_deleting_pending_delete_bitmaps) {
for (int64_t table_id : txn_info.table_ids()) {
std::string lock_key =
meta_delete_bitmap_update_lock_key({instance_id, table_id, -1});
// Read the lock first to check if it still belongs to the current txn
std::string lock_val;
TxnErrorCode err = txn->get(lock_key, &lock_val);
if (err == TxnErrorCode::TXN_OK) {
DeleteBitmapUpdateLockPB lock_info;
if (lock_info.ParseFromString(lock_val)) {
// Only remove the lock if it still belongs to the current txn
if (lock_info.lock_id() == txn_id) {
txn->remove(lock_key);
LOG(INFO) << "remove delete bitmap lock, lock_key=" << hex(lock_key)
<< " table_id=" << table_id << " txn_id=" << txn_id;
} else {
LOG(WARNING) << "delete bitmap lock is held by another txn, "
<< "lock_key=" << hex(lock_key) << " table_id=" << table_id
<< " expected_txn_id=" << txn_id
<< " actual_lock_id=" << lock_info.lock_id();
}
}
} else if (err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
LOG(WARNING) << "failed to get delete bitmap lock, lock_key=" << hex(lock_key)
<< " table_id=" << table_id << " err=" << err;
}
}
}
// The recycle txn pb will be written when recycle the commit txn log,
// if the txn versioned write is enabled.
if (!txn_info.versioned_write()) {
Expand Down Expand Up @@ -635,6 +674,7 @@ void TxnLazyCommitTask::commit() {
return;
}

bool defer_deleting_pending_delete_bitmaps = txn_info.defer_deleting_pending_delete_bitmaps();
bool is_versioned_write = txn_info.versioned_write();
bool is_versioned_read = txn_info.versioned_read();
CloneChainReader meta_reader(instance_id_, txn_kv_.get(),
Expand Down Expand Up @@ -690,7 +730,8 @@ void TxnLazyCommitTask::commit() {
executor.add([&, partition_id, this]() {
return commit_partition(db_id, partition_id,
partition_to_tmp_rowset_metas.at(partition_id),
is_versioned_read, is_versioned_write);
is_versioned_read, is_versioned_write,
defer_deleting_pending_delete_bitmaps);
});
}
bool finished = false;
Expand All @@ -711,7 +752,8 @@ void TxnLazyCommitTask::commit() {
for (int64_t partition_id : partition_ids) {
std::tie(code_, msg_) = commit_partition(
db_id, partition_id, partition_to_tmp_rowset_metas[partition_id],
is_versioned_read, is_versioned_write);
is_versioned_read, is_versioned_write,
defer_deleting_pending_delete_bitmaps);
if (code_ != MetaServiceCode::OK) break;
}
}
Expand All @@ -720,7 +762,8 @@ void TxnLazyCommitTask::commit() {
LOG(WARNING) << "txn_id=" << txn_id_ << " code=" << code_ << " msg=" << msg_;
break;
}
make_committed_txn_visible(instance_id_, db_id, txn_id_, txn_kv_, code_, msg_);
make_committed_txn_visible(instance_id_, db_id, txn_id_, txn_kv_, code_, msg_,
defer_deleting_pending_delete_bitmaps);
} while (false);
} while (code_ == MetaServiceCode::KV_TXN_CONFLICT &&
retry_times++ < config::txn_store_retry_times);
Expand All @@ -729,7 +772,8 @@ void TxnLazyCommitTask::commit() {
std::pair<MetaServiceCode, std::string> TxnLazyCommitTask::commit_partition(
int64_t db_id, int64_t partition_id,
const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowset_metas,
bool is_versioned_read, bool is_versioned_write) {
bool is_versioned_read, bool is_versioned_write,
bool defer_deleting_pending_delete_bitmaps) {
std::stringstream ss;
CloneChainReader meta_reader(instance_id_, txn_kv_.get(),
txn_lazy_committer_->resource_manager().get());
Expand Down Expand Up @@ -802,7 +846,8 @@ std::pair<MetaServiceCode, std::string> TxnLazyCommitTask::commit_partition(
convert_tmp_rowsets(instance_id_, txn_id_, txn_kv_, code, msg, db_id,
sub_partition_tmp_rowset_metas, tablet_ids, is_versioned_write,
is_versioned_read, versionstamp,
txn_lazy_committer_->resource_manager().get());
txn_lazy_committer_->resource_manager().get(),
defer_deleting_pending_delete_bitmaps);
if (code != MetaServiceCode::OK) {
return {code, msg};
}
Expand Down
3 changes: 2 additions & 1 deletion cloud/src/meta-service/txn_lazy_committer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ class TxnLazyCommitTask {
std::pair<MetaServiceCode, std::string> commit_partition(
int64_t db_id, int64_t partition_id,
const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowset_metas,
bool is_versioned_write, bool is_versioned_read);
bool is_versioned_write, bool is_versioned_read,
bool defer_deleting_pending_delete_bitmaps);

std::string instance_id_;
int64_t txn_id_;
Expand Down
4 changes: 4 additions & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,10 @@ message TxnInfoPB {
// TODO: There are more fields TBD
optional bool versioned_write = 19; // versioned write, don't need to write RecycleTxnPB again
optional bool versioned_read = 20; // whether to read versioned keys
// When enabled, defer deleting pending delete bitmaps until lazy commit completes.
// This is used for lazy commit to reduce contention during transaction commit.
// The delete bitmap locks will be extended and removed during lazy commit phase.
optional bool defer_deleting_pending_delete_bitmaps = 21;
}

// For check txn conflict and txn timeout
Expand Down
Loading