Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cloud/src/common/bvars.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ BvarLatencyRecorderWithTag g_bvar_ms_abort_snapshot("ms", "abort_snapshot");
BvarLatencyRecorderWithTag g_bvar_ms_drop_snapshot("ms", "drop_snapshot");
BvarLatencyRecorderWithTag g_bvar_ms_list_snapshot("ms", "list_snapshot");
BvarLatencyRecorderWithTag g_bvar_ms_clone_instance("ms", "clone_instance");
BvarLatencyRecorderWithTag g_bvar_ms_compact_snapshot("ms", "compact_snapshot");
BvarLatencyRecorderWithTag g_bvar_ms_update_packed_file_info("ms", "update_packed_file_info");
bvar::Adder<int64_t> g_bvar_update_delete_bitmap_fail_counter;
bvar::Window<bvar::Adder<int64_t> > g_bvar_update_delete_bitmap_fail_counter_minute("ms", "update_delete_bitmap_fail", &g_bvar_update_delete_bitmap_fail_counter, 60);
Expand Down Expand Up @@ -474,6 +475,9 @@ mBvarInt64Adder g_bvar_rpc_kv_drop_snapshot_del_counter("rpc_kv_drop_snapshot_de
mBvarInt64Adder g_bvar_rpc_kv_clone_instance_get_counter("rpc_kv_clone_instance_get_counter",{"instance_id"});
mBvarInt64Adder g_bvar_rpc_kv_clone_instance_put_counter("rpc_kv_clone_instance_put_counter",{"instance_id"});
mBvarInt64Adder g_bvar_rpc_kv_clone_instance_del_counter("rpc_kv_clone_instance_del_counter",{"instance_id"});
// compact_snapshot
mBvarInt64Adder g_bvar_rpc_kv_compact_snapshot_get_counter("rpc_kv_compact_snapshot_get_counter",{"instance_id"});
mBvarInt64Adder g_bvar_rpc_kv_compact_snapshot_put_counter("rpc_kv_compact_snapshot_put_counter",{"instance_id"});

// bytes
// get_rowset
Expand Down Expand Up @@ -679,5 +683,8 @@ mBvarInt64Adder g_bvar_rpc_kv_list_snapshot_del_bytes("rpc_kv_list_snapshot_del_
mBvarInt64Adder g_bvar_rpc_kv_clone_instance_get_bytes("rpc_kv_clone_instance_get_bytes",{"instance_id"});
mBvarInt64Adder g_bvar_rpc_kv_clone_instance_put_bytes("rpc_kv_clone_instance_put_bytes",{"instance_id"});
mBvarInt64Adder g_bvar_rpc_kv_clone_instance_del_bytes("rpc_kv_clone_instance_del_bytes",{"instance_id"});
// compact_snapshot
mBvarInt64Adder g_bvar_rpc_kv_compact_snapshot_get_bytes("rpc_kv_compact_snapshot_get_bytes",{"instance_id"});
mBvarInt64Adder g_bvar_rpc_kv_compact_snapshot_put_bytes("rpc_kv_compact_snapshot_put_bytes",{"instance_id"});

// clang-format on
5 changes: 5 additions & 0 deletions cloud/src/common/bvars.h
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ extern BvarLatencyRecorderWithTag g_bvar_ms_abort_snapshot;
extern BvarLatencyRecorderWithTag g_bvar_ms_drop_snapshot;
extern BvarLatencyRecorderWithTag g_bvar_ms_list_snapshot;
extern BvarLatencyRecorderWithTag g_bvar_ms_clone_instance;
extern BvarLatencyRecorderWithTag g_bvar_ms_compact_snapshot;
extern BvarLatencyRecorderWithTag g_bvar_ms_update_packed_file_info;
extern bvar::Adder<int64_t> g_bvar_update_delete_bitmap_fail_counter;
extern bvar::Adder<int64_t> g_bvar_get_delete_bitmap_fail_counter;
Expand Down Expand Up @@ -900,6 +901,8 @@ extern mBvarInt64Adder g_bvar_rpc_kv_drop_snapshot_del_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_clone_instance_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_clone_instance_put_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_clone_instance_del_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_compact_snapshot_get_counter;
extern mBvarInt64Adder g_bvar_rpc_kv_compact_snapshot_put_counter;

extern mBvarInt64Adder g_bvar_rpc_kv_get_rowset_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_get_version_get_bytes;
Expand Down Expand Up @@ -1039,6 +1042,8 @@ extern mBvarInt64Adder g_bvar_rpc_kv_list_snapshot_del_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_clone_instance_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_clone_instance_put_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_clone_instance_del_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_compact_snapshot_get_bytes;
extern mBvarInt64Adder g_bvar_rpc_kv_compact_snapshot_put_bytes;

// meta ranges
extern mBvarStatus<int64_t> g_bvar_fdb_kv_ranges_count;
10 changes: 10 additions & 0 deletions cloud/src/meta-service/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,10 @@ class MetaServiceImpl : public cloud::MetaService {
const CloneInstanceRequest* request, CloneInstanceResponse* response,
::google::protobuf::Closure* done) override;

void compact_snapshot(::google::protobuf::RpcController* controller,
const CompactSnapshotRequest* request, CompactSnapshotResponse* response,
::google::protobuf::Closure* done) override;

private:
std::pair<MetaServiceCode, std::string> alter_instance(
const AlterInstanceRequest* request,
Expand Down Expand Up @@ -998,6 +1002,12 @@ class MetaServiceProxy final : public MetaService {
call_impl(&cloud::MetaService::clone_instance, controller, request, response, done);
}

void compact_snapshot(::google::protobuf::RpcController* controller,
const CompactSnapshotRequest* request, CompactSnapshotResponse* response,
::google::protobuf::Closure* done) override {
call_impl(&cloud::MetaService::compact_snapshot, controller, request, response, done);
}

private:
template <typename Request, typename Response>
using MetaServiceMethod = void (cloud::MetaService::*)(::google::protobuf::RpcController*,
Expand Down
15 changes: 15 additions & 0 deletions cloud/src/meta-service/meta_service_http.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,19 @@ static HttpResponse process_list_snapshot(MetaServiceImpl* service, brpc::Contro
return http_json_reply_message(resp.status(), resp);
}

static HttpResponse process_compact_snapshot(MetaServiceImpl* service, brpc::Controller* ctrl) {
auto& uri = ctrl->http_request().uri();
std::string instance_id(http_query(uri, "instance_id"));
if (instance_id.empty()) {
return http_json_reply(MetaServiceCode::INVALID_ARGUMENT, "instance_id is empty");
}
CompactSnapshotRequest req;
req.set_instance_id(instance_id);
CompactSnapshotResponse resp;
service->compact_snapshot(ctrl, &req, &resp, nullptr);
return http_json_reply(resp.status());
}

static HttpResponse process_set_snapshot_property(MetaServiceImpl* service,
brpc::Controller* ctrl) {
AlterInstanceRequest req;
Expand Down Expand Up @@ -972,6 +985,8 @@ void MetaServiceImpl::http(::google::protobuf::RpcController* controller,
{"v1/set_snapshot_property", process_set_snapshot_property},
{"v1/get_snapshot_property", process_get_snapshot_property},
{"v1/set_multi_version_status", process_set_multi_version_status},
{"compact_snapshot", process_compact_snapshot},
{"v1/compact_snapshot", process_compact_snapshot},
// misc
{"abort_txn", process_abort_txn},
{"abort_tablet_job", process_abort_tablet_job},
Expand Down
19 changes: 15 additions & 4 deletions cloud/src/meta-service/meta_service_partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ void MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
IndexIndexPB index_index_pb;
index_index_pb.set_db_id(db_id);
index_index_pb.set_table_id(table_id);
LOG(INFO) << index_index_pb.DebugString();
std::string index_index_value;
if (!index_index_pb.SerializeToString(&index_index_value)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
Expand All @@ -284,6 +283,11 @@ void MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
versioned_put(txn.get(), index_meta_key, "");
txn->put(index_inverted_key, "");
txn->put(index_index_key, index_index_value);
LOG_INFO("put versioned index keys")
.tag("index_id", index_id)
.tag("index_meta_key", hex(index_meta_key))
.tag("index_index_key", hex(index_index_key))
.tag("index_inverted_key", hex(index_inverted_key));

commit_index_log.add_index_ids(index_id);
}
Expand Down Expand Up @@ -313,8 +317,11 @@ void MetaServiceImpl::commit_index(::google::protobuf::RpcController* controller
txn->put(part_inverted_index_key, "");
txn->put(part_index_key, part_index_value);

LOG(INFO) << "xxx put versioned partition index key=" << hex(part_index_key)
<< " partition_id=" << partition_id;
LOG_INFO("put versioned partition index key")
.tag("partition_id", partition_id)
.tag("part_meta_key", hex(part_meta_key))
.tag("part_index_key", hex(part_index_key))
.tag("part_inverted_index_key", hex(part_inverted_index_key));

commit_index_log.add_partition_ids(partition_id);
}
Expand Down Expand Up @@ -809,7 +816,6 @@ void MetaServiceImpl::commit_partition_internal(const PartitionRequest* request,
PartitionIndexPB part_index_pb;
part_index_pb.set_db_id(db_id);
part_index_pb.set_table_id(table_id);
LOG(INFO) << part_index_pb.DebugString();
std::string part_index_value;
if (!part_index_pb.SerializeToString(&part_index_value)) {
code = MetaServiceCode::PROTOBUF_SERIALIZE_ERR;
Expand All @@ -820,6 +826,11 @@ void MetaServiceImpl::commit_partition_internal(const PartitionRequest* request,
versioned_put(txn.get(), part_meta_key, "");
txn->put(part_inverted_index_key, "");
txn->put(part_index_key, part_index_value);
LOG_INFO("put versioned partition index key")
.tag("partition_id", part_id)
.tag("part_meta_key", hex(part_meta_key))
.tag("part_index_key", hex(part_index_key))
.tag("part_inverted_index_key", hex(part_inverted_index_key));

commit_partition_log.add_partition_ids(part_id);
}
Expand Down
2 changes: 1 addition & 1 deletion cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2547,7 +2547,7 @@ std::pair<MetaServiceCode, std::string> MetaServiceImpl::alter_instance(
LOG(WARNING) << msg << " err=" << err;
return std::make_pair(code, msg);
}
LOG(INFO) << "alter instance key=" << hex(key);
LOG(INFO) << "alter instance key=" << hex(key) << " instance_id=" << instance_id;
InstanceInfoPB instance;
if (!instance.ParseFromString(val)) {
msg = "failed to parse InstanceInfoPB";
Expand Down
26 changes: 26 additions & 0 deletions cloud/src/meta-service/meta_service_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,4 +184,30 @@ void MetaServiceImpl::drop_snapshot(::google::protobuf::RpcController* controlle
msg = response->status().msg();
}

void MetaServiceImpl::compact_snapshot(::google::protobuf::RpcController* controller,
const CompactSnapshotRequest* request,
CompactSnapshotResponse* response,
::google::protobuf::Closure* done) {
RPC_PREPROCESS(compact_snapshot, get, put);

if (!request->instance_id().empty()) {
instance_id = request->instance_id();
} else if (request->has_cloud_unique_id() && !request->cloud_unique_id().empty()) {
instance_id = get_instance_id(resource_mgr_, request->cloud_unique_id());
if (instance_id.empty()) {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "empty instance_id";
return;
}
} else {
code = MetaServiceCode::INVALID_ARGUMENT;
msg = "instance_id or cloud_unique_id is required";
return;
}

RPC_RATE_LIMIT(compact_snapshot);

std::tie(code, msg) = snapshot_manager_->compact_snapshot(instance_id);
}

} // namespace doris::cloud
1 change: 1 addition & 0 deletions cloud/src/recycler/recycler_operation_log.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ using namespace std::chrono;
int OperationLogRecycleChecker::init() {
source_snapshot_versionstamp_ = Versionstamp::min();
if (instance_info_.has_source_snapshot_id() &&
instance_info_.snapshot_compact_status() != SNAPSHOT_COMPACT_DONE &&
!SnapshotManager::parse_snapshot_versionstamp(instance_info_.source_snapshot_id(),
&source_snapshot_versionstamp_)) {
LOG_WARNING("failed to parse versionstamp from source snapshot id")
Expand Down
18 changes: 15 additions & 3 deletions cloud/src/recycler/snapshot_chain_compactor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,16 @@ int is_instance_cloned(TxnKv* txn_kv, const std::string& instance_id, bool* is_c
}

bool SnapshotChainCompactor::is_snapshot_chain_need_compact(const InstanceInfoPB& instance_info) {
// Skip instances that have already completed compact
if (instance_info.snapshot_compact_status() == SnapshotCompactStatus::SNAPSHOT_COMPACT_DONE) {
return false;
}

// Instances with DOING status should be compacted (manually triggered)
if (instance_info.snapshot_compact_status() == SnapshotCompactStatus::SNAPSHOT_COMPACT_DOING) {
return true;
}

// compact the instance which meets the following conditions:
// 1. the instance is cloned from snapshot
// 2. its source instance is not cloned from other snapshots
Expand All @@ -247,7 +257,9 @@ bool SnapshotChainCompactor::is_snapshot_chain_need_compact(const InstanceInfoPB
<< instance_info.source_instance_id();
return false;
}
if (is_instance_cloned_from_snapshot(source_instance_info)) {
if (is_instance_cloned_from_snapshot(source_instance_info) &&
source_instance_info.snapshot_compact_status() !=
SnapshotCompactStatus::SNAPSHOT_COMPACT_DONE) {
return false;
}

Expand Down Expand Up @@ -435,8 +447,8 @@ int InstanceChainCompactor::handle_compaction_completion() {
std::string reference_key = versioned::snapshot_reference_key(ref_key_info);
txn->remove(reference_key);

// instance_info.clear_source_instance_id();
instance_info.clear_source_snapshot_id();
// Preserve source_instance_id and source_snapshot_id, mark compact as done
instance_info.set_snapshot_compact_status(SnapshotCompactStatus::SNAPSHOT_COMPACT_DONE);
instance_info.clear_compacted_key_sets();
txn->atomic_add(system_meta_service_instance_update_key(), 1);
txn->put(key, instance_info.SerializeAsString());
Expand Down
21 changes: 13 additions & 8 deletions cloud/src/resource-manager/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ std::pair<MetaServiceCode, std::string> ResourceManager::add_cluster(const std::

txn->atomic_add(system_meta_service_instance_update_key(), 1);
txn->put(key, val);
LOG(INFO) << "put instance_key=" << hex(key);
LOG(INFO) << "put instance_key=" << hex(key) << " instance_id=" << instance_id;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to commit kv txn";
Expand All @@ -618,8 +618,8 @@ std::pair<MetaServiceCode, std::string> ResourceManager::add_cluster(const std::
}

/**
* The current implementation is to add fe clusters through HTTP API,
* such as follower nodes `ABC` in the cluster, and then immediately drop follower node `A`, while fe is not yet pulled up,
* The current implementation is to add fe clusters through HTTP API,
* such as follower nodes `ABC` in the cluster, and then immediately drop follower node `A`, while fe is not yet pulled up,
* which may result in the formation of a multi master fe cluster
* This function provides a simple protection mechanism that does not allow dropping the fe node within 5 minutes after adding it through the API(add_cluster/add_node).
* If you bypass this protection and do the behavior described above, god bless you.
Expand Down Expand Up @@ -754,7 +754,7 @@ std::pair<MetaServiceCode, std::string> ResourceManager::drop_cluster(

txn->atomic_add(system_meta_service_instance_update_key(), 1);
txn->put(key, val);
LOG(INFO) << "put instance_key=" << hex(key);
LOG(INFO) << "put instance_key=" << hex(key) << " instance_id=" << instance_id;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
msg = "failed to commit kv txn";
Expand Down Expand Up @@ -894,7 +894,7 @@ std::string ResourceManager::update_cluster(

txn->atomic_add(system_meta_service_instance_update_key(), 1);
txn->put(key, val);
LOG(INFO) << "put instanace_key=" << hex(key);
LOG(INFO) << "put instanace_key=" << hex(key) << " instance_id=" << instance_id;
TxnErrorCode err_code = txn->commit();
if (err_code != TxnErrorCode::TXN_OK) {
msg = "failed to commit kv txn";
Expand Down Expand Up @@ -958,7 +958,7 @@ std::pair<TxnErrorCode, std::string> ResourceManager::get_instance(std::shared_p
}

TxnErrorCode err = txn->get(key, &val);
LOG(INFO) << "get instance_key=" << hex(key);
LOG(INFO) << "get instance_key=" << hex(key) << " instance_id=" << instance_id;

if (err != TxnErrorCode::TXN_OK) {
code = err;
Expand All @@ -969,7 +969,7 @@ std::pair<TxnErrorCode, std::string> ResourceManager::get_instance(std::shared_p

if (inst_pb != nullptr && !inst_pb->ParseFromString(val)) {
code = TxnErrorCode::TXN_UNIDENTIFIED_ERROR;
msg = "failed to parse InstanceInfoPB";
msg = "failed to parse InstanceInfoPB, instance_id=" + instance_id;
return ec;
}

Expand Down Expand Up @@ -1364,7 +1364,7 @@ std::string ResourceManager::modify_nodes(const std::string& instance_id,

txn->atomic_add(system_meta_service_instance_update_key(), 1);
txn->put(key, val);
LOG(INFO) << "put instance_key=" << hex(key);
LOG(INFO) << "put instance_key=" << hex(key) << " instance_id=" << instance_id;
TxnErrorCode err_code = txn->commit();
if (err_code != TxnErrorCode::TXN_OK) {
msg = "failed to commit kv txn";
Expand Down Expand Up @@ -1443,6 +1443,11 @@ void ResourceManager::refresh_instance(const std::string& instance_id,
}

if (instance.has_source_instance_id() && !instance.source_instance_id().empty()) {
// Instances that have completed snapshot compact should not be in the map
if (instance.snapshot_compact_status() == SnapshotCompactStatus::SNAPSHOT_COMPACT_DONE) {
instance_source_snapshot_info_.erase(instance_id);
return;
}
Versionstamp versionstamp;
if (!SnapshotManager::parse_snapshot_versionstamp(instance.source_snapshot_id(),
&versionstamp)) {
Expand Down
5 changes: 5 additions & 0 deletions cloud/src/snapshot/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ void SnapshotManager::clone_instance(const CloneInstanceRequest& request,
response->mutable_status()->set_msg("Not implemented");
}

std::pair<MetaServiceCode, std::string> SnapshotManager::compact_snapshot(
std::string_view instance_id) {
return {MetaServiceCode::UNDEFINED_ERR, "Not implemented"};
}

std::pair<MetaServiceCode, std::string> SnapshotManager::set_multi_version_status(
std::string_view instance_id, MultiVersionStatus multi_version_status) {
return {MetaServiceCode::UNDEFINED_ERR, "Not implemented"};
Expand Down
3 changes: 3 additions & 0 deletions cloud/src/snapshot/snapshot_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class SnapshotManager {
virtual void clone_instance(const CloneInstanceRequest& request,
CloneInstanceResponse* response);

// Manually trigger snapshot compact for an instance.
virtual std::pair<MetaServiceCode, std::string> compact_snapshot(std::string_view instance_id);

virtual std::pair<MetaServiceCode, std::string> set_multi_version_status(
std::string_view instance_id, MultiVersionStatus multi_version_status);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,8 @@ public long commitMaterializedIndex(long dbId, long tableId, List<Long> indexIds
if (partitionIds != null) {
indexRequestBuilder.addAllPartitionIds(partitionIds);
}
LOG.debug("committing materialized index for tableId: {}, partitionIds: {}, indexIds: {}",
tableId, partitionIds, indexIds);
final Cloud.IndexRequest indexRequest = indexRequestBuilder.build();

Cloud.IndexResponse response = null;
Expand Down
Loading
Loading