Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include <bthread/condition_variable.h>
#include <bthread/mutex.h>
#include <bthread/unstable.h>
#include <butil/time.h>
#include <bvar/bvar.h>
#include <bvar/reducer.h>

Expand Down Expand Up @@ -807,10 +809,39 @@ void CloudWarmUpManager::record_balanced_tablet(int64_t tablet_id, const std::st
meta.brpc_port = brpc_port;
shard.tablets.emplace(tablet_id, std::move(meta));
g_balance_tablet_be_mapping_size << 1;
schedule_remove_balanced_tablet(tablet_id);
VLOG_DEBUG << "Recorded balanced warm up cache tablet: tablet_id=" << tablet_id
<< ", host=" << host << ":" << brpc_port;
}

void CloudWarmUpManager::schedule_remove_balanced_tablet(int64_t tablet_id) {
// Use std::make_unique to avoid raw pointer allocation
auto tablet_id_ptr = std::make_unique<int64_t>(tablet_id);
unsigned long expired_ms = g_tablet_report_inactive_duration_ms;
if (doris::config::cache_read_from_peer_expired_seconds > 0 &&
doris::config::cache_read_from_peer_expired_seconds <=
g_tablet_report_inactive_duration_ms / 1000) {
expired_ms = doris::config::cache_read_from_peer_expired_seconds * 1000;
}
bthread_timer_t timer_id;
// ATTN: The timer callback will reclaim ownership of the tablet_id_ptr, so we need to release it after the timer is added.
if (const int rc = bthread_timer_add(&timer_id, butil::milliseconds_from_now(expired_ms),
clean_up_expired_mappings, tablet_id_ptr.get());
rc == 0) {
tablet_id_ptr.release();
} else {
LOG(WARNING) << "Fail to add timer for clean up expired mappings for tablet_id="
<< tablet_id << " rc=" << rc;
}
}

void CloudWarmUpManager::clean_up_expired_mappings(void* arg) {
std::unique_ptr<int64_t> tid(static_cast<int64_t*>(arg));
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.remove_balanced_tablet(*tid);
VLOG_DEBUG << "Removed expired balanced warm up cache tablet: tablet_id=" << *tid;
}

std::optional<std::pair<std::string, int32_t>> CloudWarmUpManager::get_balanced_tablet_info(
int64_t tablet_id) {
auto& shard = get_shard(tablet_id);
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_warm_up_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class CloudWarmUpManager {
std::unordered_map<int64_t, std::pair<std::string, int32_t>> get_all_balanced_tablets() const;

private:
void schedule_remove_balanced_tablet(int64_t tablet_id);
static void clean_up_expired_mappings(void* arg);
void handle_jobs();

Status _do_warm_up_rowset(RowsetMeta& rs_meta, std::vector<TReplicaInfo>& replicas,
Expand Down
29 changes: 1 addition & 28 deletions be/src/io/cache/block_file_cache_downloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,14 +173,6 @@ std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTable
return id_to_rowset_meta_map;
}

static void clean_up_expired_mappings(void* arg) {
// Reclaim ownership with unique_ptr for automatic memory management
std::unique_ptr<int64_t> tablet_id(static_cast<int64_t*>(arg));
auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
manager.remove_balanced_tablet(*tablet_id);
VLOG_DEBUG << "Removed expired balanced warm up cache tablet: tablet_id=" << *tablet_id;
}

void FileCacheBlockDownloader::download_file_cache_block(
const DownloadTask::FileCacheBlockMetaVec& metas) {
std::unordered_set<int64_t> synced_tablets;
Expand Down Expand Up @@ -236,27 +228,8 @@ void FileCacheBlockDownloader::download_file_cache_block(
<< "]";
}
}
// Use std::make_unique to avoid raw pointer allocation
auto tablet_id_ptr = std::make_unique<int64_t>(tablet_id);
unsigned long expired_ms = g_tablet_report_inactive_duration_ms;
if (doris::config::cache_read_from_peer_expired_seconds > 0 &&
doris::config::cache_read_from_peer_expired_seconds <=
g_tablet_report_inactive_duration_ms / 1000) {
expired_ms = doris::config::cache_read_from_peer_expired_seconds * 1000;
}
bthread_timer_t timer_id;
// ATTN: The timer callback will reclaim ownership of the tablet_id_ptr, so we need to release it after the timer is added.
if (const int rc =
bthread_timer_add(&timer_id, butil::milliseconds_from_now(expired_ms),
clean_up_expired_mappings, tablet_id_ptr.get());
rc == 0) {
tablet_id_ptr.release();
} else {
LOG(WARNING) << "Fail to add timer for clean up expired mappings for tablet_id="
<< tablet_id << " rc=" << rc;
}
LOG(INFO) << "download_file_cache_block: download_done, tablet_Id=" << tablet_id
<< " status=" << st.to_string() << " expired_ms=" << expired_ms;
<< " status=" << st.to_string();
};

std::string path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ suite('test_balance_use_compute_group_properties', 'docker') {
'report_tablet_interval_seconds=1',
'schedule_sync_tablets_interval_s=18000',
'disable_auto_compaction=true',
'sys_log_verbose_modules=*'
'sys_log_verbose_modules=*',
'enable_packed_file=false',
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ suite('test_balance_warm_up', 'docker') {
'disable_auto_compaction=true',
'sys_log_verbose_modules=*',
'cache_read_from_peer_expired_seconds=100',
'enable_cache_read_from_peer=true'
'enable_cache_read_from_peer=true',
'enable_packed_file=false',
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ suite('test_balance_warm_up_sync_cache', 'docker') {
'report_tablet_interval_seconds=1',
'schedule_sync_tablets_interval_s=18000',
'disable_auto_compaction=true',
'sys_log_verbose_modules=*'
'sys_log_verbose_modules=*',
'enable_packed_file=false',
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ suite('test_balance_warm_up_task_abnormal', 'docker') {
'report_tablet_interval_seconds=1',
'schedule_sync_tablets_interval_s=18000',
'disable_auto_compaction=true',
'sys_log_verbose_modules=*'
'sys_log_verbose_modules=*',
'enable_packed_file=false',
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ suite('test_balance_warm_up_use_peer_cache', 'docker') {
'disable_auto_compaction=true',
'sys_log_verbose_modules=*',
'cache_read_from_peer_expired_seconds=100',
'enable_cache_read_from_peer=true'
'enable_cache_read_from_peer=true',
'enable_packed_file=false',
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ suite('test_balance_warm_up_with_compaction_use_peer_cache', 'docker') {
'sys_log_verbose_modules=*',
'cumulative_compaction_min_deltas=5',
'cache_read_from_peer_expired_seconds=100',
'enable_cache_read_from_peer=true'
'enable_cache_read_from_peer=true',
'enable_packed_file=false',
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ suite('test_peer_read_async_warmup', 'docker') {
'disable_auto_compaction=true',
'sys_log_verbose_modules=*',
'enable_cache_read_from_peer=true',
'enable_packed_file=false',
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ suite('test_clean_stale_rs_file_cache', 'docker') {
'cumulative_compaction_min_deltas=5',
'tablet_rowset_stale_sweep_by_size=false',
'tablet_rowset_stale_sweep_time_sec=60',
'vacuum_stale_rowsets_interval_s=10'
'vacuum_stale_rowsets_interval_s=10',
'enable_packed_file=false',
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ suite('test_clean_stale_rs_index_file_cache', 'docker') {
'cumulative_compaction_min_deltas=5',
'tablet_rowset_stale_sweep_by_size=false',
'tablet_rowset_stale_sweep_time_sec=60',
'vacuum_stale_rowsets_interval_s=10'
'vacuum_stale_rowsets_interval_s=10',
'enable_packed_file=false',
]
options.setFeNum(1)
options.setBeNum(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ suite('test_clean_tablet_when_drop_force_table', 'docker') {
'report_tablet_interval_seconds=1',
'write_buffer_size=10240',
'write_buffer_size_for_agg=10240',
'sys_log_verbose_modules=task_worker_pool'
'sys_log_verbose_modules=task_worker_pool',
'enable_packed_file=false',
]
options.setFeNum(3)
options.setBeNum(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ suite('test_clean_tablet_when_rebalance', 'docker') {
options.beConfigs += [
'report_tablet_interval_seconds=1',
'write_buffer_size=10240',
'write_buffer_size_for_agg=10240'
'write_buffer_size_for_agg=10240',
'enable_packed_file=false',
]
options.setFeNum(3)
options.setBeNum(3)
Expand Down
Loading