Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,13 +425,7 @@ void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* c
<< " us, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rowset_id.to_string();
}
int64_t expiration_time =
tablet_meta->ttl_seconds() == 0 || rs_meta.newest_write_timestamp() <= 0
? 0
: rs_meta.newest_write_timestamp() + tablet_meta->ttl_seconds();
if (expiration_time <= UnixSeconds()) {
expiration_time = 0;
}
int64_t expiration_time = tablet_meta->ttl_seconds();

if (!tablet->add_rowset_warmup_state(rs_meta, WarmUpTriggerSource::EVENT_DRIVEN)) {
LOG(INFO) << "found duplicate warmup task for rowset " << rowset_id.to_string()
Expand Down
29 changes: 27 additions & 2 deletions be/src/cloud/cloud_storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,36 @@ bool CloudStorageEngine::stopped() {

Result<BaseTabletSPtr> CloudStorageEngine::get_tablet(int64_t tablet_id,
SyncRowsetStats* sync_stats,
bool force_use_only_cached) {
return _tablet_mgr->get_tablet(tablet_id, false, true, sync_stats, force_use_only_cached)
bool force_use_only_cached,
bool cache_on_miss) {
return _tablet_mgr
->get_tablet(tablet_id, false, true, sync_stats, force_use_only_cached, cache_on_miss)
.transform([](auto&& t) { return static_pointer_cast<BaseTablet>(std::move(t)); });
}

Status CloudStorageEngine::get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
bool force_use_only_cached) {
if (tablet_meta == nullptr) {
return Status::InvalidArgument("tablet_meta output is null");
}

#if 0
if (_tablet_mgr && _tablet_mgr->peek_tablet_meta(tablet_id, tablet_meta)) {
return Status::OK();
}

if (force_use_only_cached) {
return Status::NotFound("tablet meta {} not found in cache", tablet_id);
}
#endif

if (_meta_mgr == nullptr) {
return Status::InternalError("cloud meta manager is not initialized");
}

return _meta_mgr->get_tablet_meta(tablet_id, tablet_meta);
}

Status CloudStorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) {
RETURN_IF_ERROR(Thread::create(
"CloudStorageEngine", "refresh_s3_info_thread",
Expand Down
19 changes: 18 additions & 1 deletion be/src/cloud/cloud_storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,25 @@ class CloudStorageEngine final : public BaseStorageEngine {
void stop() override;
bool stopped() override;

/* Parameters:
* - tablet_id: the id of tablet to get
* - sync_stats: the stats of sync rowset
* - force_use_only_cached: whether only use cached tablet meta
* - cache_on_miss: whether cache the tablet meta when missing in cache
*/
Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr,
bool force_use_only_cached = false) override;
bool force_use_only_cached = false,
bool cache_on_miss = true) override;

/*
* Get the tablet meta for a specific tablet
* Parameters:
* - tablet_id: the id of tablet to get meta for
* - tablet_meta: output TabletMeta shared pointer
* - force_use_only_cached: whether only use cached tablet meta (return NotFound on miss)
*/
Status get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
bool force_use_only_cached = false) override;

Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override;

Expand Down
24 changes: 1 addition & 23 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,7 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_
continue;
}

int64_t expiration_time =
_tablet_meta->ttl_seconds() == 0 ||
rowset_meta->newest_write_timestamp() <= 0
? 0
: rowset_meta->newest_write_timestamp() +
_tablet_meta->ttl_seconds();
int64_t expiration_time = _tablet_meta->ttl_seconds();
g_file_cache_cloud_tablet_submitted_segment_num << 1;
if (rs->rowset_meta()->segment_file_size(seg_id) > 0) {
g_file_cache_cloud_tablet_submitted_segment_size
Expand Down Expand Up @@ -1530,23 +1525,6 @@ Status CloudTablet::sync_meta() {
return st;
}

auto new_ttl_seconds = tablet_meta->ttl_seconds();
if (_tablet_meta->ttl_seconds() != new_ttl_seconds) {
_tablet_meta->set_ttl_seconds(new_ttl_seconds);
int64_t cur_time = UnixSeconds();
std::shared_lock rlock(_meta_lock);
for (auto& [_, rs] : _rs_version_map) {
for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
int64_t new_expiration_time =
new_ttl_seconds + rs->rowset_meta()->newest_write_timestamp();
new_expiration_time = new_expiration_time > cur_time ? new_expiration_time : 0;
auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
file_cache->modify_expiration_time(file_key, new_expiration_time);
}
}
}

auto new_compaction_policy = tablet_meta->compaction_policy();
if (_tablet_meta->compaction_policy() != new_compaction_policy) {
_tablet_meta->set_compaction_policy(new_compaction_policy);
Expand Down
47 changes: 32 additions & 15 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ void set_tablet_access_time_ms(CloudTablet* tablet) {
Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data,
bool sync_delete_bitmap,
SyncRowsetStats* sync_stats,
bool force_use_only_cached) {
bool force_use_only_cached,
bool cache_on_miss) {
// LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr`
class Value : public LRUCacheValueBase {
public:
Expand Down Expand Up @@ -193,7 +194,7 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
if (sync_stats) {
++sync_stats->tablet_meta_cache_miss;
}
auto load_tablet = [this, &key, warmup_data, sync_delete_bitmap,
auto load_tablet = [this, warmup_data, sync_delete_bitmap,
sync_stats](int64_t tablet_id) -> Result<std::shared_ptr<CloudTablet>> {
TabletMetaSharedPtr tablet_meta;
auto start = std::chrono::steady_clock::now();
Expand All @@ -209,7 +210,6 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
}

auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta));
auto value = std::make_unique<Value>(tablet, *_tablet_map);
// MUST sync stats to let compaction scheduler work correctly
SyncOptions options;
options.warmup_delta_data = warmup_data;
Expand All @@ -219,16 +219,7 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
LOG(WARNING) << "failed to sync tablet " << tablet_id << ": " << st;
return ResultError(st);
}

auto* handle = _cache->insert(key, value.release(), 1, sizeof(CloudTablet),
CachePriority::NORMAL);
auto ret =
std::shared_ptr<CloudTablet>(tablet.get(), [this, handle](CloudTablet* tablet) {
set_tablet_access_time_ms(tablet);
_cache->release(handle);
});
_tablet_map->put(std::move(tablet));
return ret;
return tablet;
};

auto load_result = s_singleflight_load_tablet.load(tablet_id, std::move(load_tablet));
Expand All @@ -237,8 +228,22 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
load_result.error()));
}
auto tablet = load_result.value();
set_tablet_access_time_ms(tablet.get());
return tablet;
if (!cache_on_miss) {
set_tablet_access_time_ms(tablet.get());
return tablet;
}

auto value = std::make_unique<Value>(tablet, *_tablet_map);
auto* insert_handle =
_cache->insert(key, value.release(), 1, sizeof(CloudTablet), CachePriority::NORMAL);
auto ret = std::shared_ptr<CloudTablet>(tablet.get(),
[this, insert_handle](CloudTablet* tablet_ptr) {
set_tablet_access_time_ms(tablet_ptr);
_cache->release(insert_handle);
});
_tablet_map->put(std::move(tablet));
set_tablet_access_time_ms(ret.get());
return ret;
}
if (sync_stats) {
++sync_stats->tablet_meta_cache_hit;
Expand All @@ -252,6 +257,18 @@ Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_i
return tablet;
}

bool CloudTabletMgr::peek_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) {
if (tablet_meta == nullptr) {
return false;
}
auto tablet = _tablet_map->get(tablet_id);
if (!tablet) {
return false;
}
*tablet_meta = tablet->tablet_meta();
return true;
}

void CloudTabletMgr::erase_tablet(int64_t tablet_id) {
auto tablet_id_str = std::to_string(tablet_id);
CacheKey key(tablet_id_str.data(), tablet_id_str.size());
Expand Down
15 changes: 14 additions & 1 deletion be/src/cloud/cloud_tablet_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include "common/status.h"
#include "olap/olap_common.h"
#include "olap/tablet_fwd.h"

namespace doris {

Expand All @@ -44,10 +45,22 @@ class CloudTabletMgr {

// If the tablet is in cache, return this tablet directly; otherwise will get tablet meta first,
// sync rowsets after, and download segment data in background if `warmup_data` is true.
/* Parameters:
* - tablet_id: the id of tablet to get
* - warmup_data: whether warmup tablet data in background
* - sync_delete_bitmap: whether sync delete bitmap when getting tablet
* - sync_stats: the stats of sync rowset
* - force_use_only_cached: whether only use cached tablet meta
* - cache_on_miss: whether cache the tablet meta when missing in cache
*/
Result<std::shared_ptr<CloudTablet>> get_tablet(int64_t tablet_id, bool warmup_data = false,
bool sync_delete_bitmap = true,
SyncRowsetStats* sync_stats = nullptr,
bool local_only = false);
bool force_use_only_cached = false,
bool cache_on_miss = true);

// Return true if cached tablet meta is found (without triggering RPC) and filled.
bool peek_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta);

void erase_tablet(int64_t tablet_id);

Expand Down
8 changes: 1 addition & 7 deletions be/src/cloud/cloud_warm_up_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,7 @@ void CloudWarmUpManager::handle_jobs() {
continue;
}

int64_t expiration_time =
tablet_meta->ttl_seconds() == 0 || rs->newest_write_timestamp() <= 0
? 0
: rs->newest_write_timestamp() + tablet_meta->ttl_seconds();
if (expiration_time <= UnixSeconds()) {
expiration_time = 0;
}
int64_t expiration_time = tablet_meta->ttl_seconds();
if (!tablet->add_rowset_warmup_state(*rs, WarmUpTriggerSource::JOB)) {
LOG(INFO) << "found duplicate warmup task for rowset " << rs->rowset_id()
<< ", skip it";
Expand Down
6 changes: 4 additions & 2 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1141,18 +1141,20 @@ DEFINE_mInt64(cache_lock_held_long_tail_threshold_us, "30000000");
DEFINE_mBool(enable_file_cache_keep_base_compaction_output, "false");
DEFINE_mBool(enable_file_cache_adaptive_write, "true");
DEFINE_mDouble(file_cache_keep_base_compaction_output_min_hit_ratio, "0.7");
DEFINE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio, "0.7");

// if difference below this threshold, we consider cache's progressive upgrading (2.0->3.0) successful
DEFINE_mDouble(file_cache_meta_store_vs_file_system_diff_num_threshold, "0.3");
DEFINE_mDouble(file_cache_keep_schema_change_output_min_hit_ratio, "0.7");

DEFINE_mInt64(file_cache_remove_block_qps_limit, "1000");
DEFINE_mInt64(file_cache_background_gc_interval_ms, "100");
DEFINE_mInt64(file_cache_background_block_lru_update_interval_ms, "5000");
DEFINE_mInt64(file_cache_background_block_lru_update_qps_limit, "1000");
DEFINE_mBool(enable_reader_dryrun_when_download_file_cache, "true");
DEFINE_mInt64(file_cache_background_monitor_interval_ms, "5000");
DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "3000");
DEFINE_mInt64(file_cache_background_ttl_gc_interval_ms, "180000");
DEFINE_mInt64(file_cache_background_ttl_info_update_interval_ms, "180000");
DEFINE_mInt64(file_cache_background_tablet_id_flush_interval_ms, "1000");
DEFINE_mInt64(file_cache_background_ttl_gc_batch, "1000");
DEFINE_mInt64(file_cache_background_lru_dump_interval_ms, "60000");
// dump queue only if the queue update specific times through several dump intervals
Expand Down
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,8 @@ DECLARE_mInt64(file_cache_background_block_lru_update_qps_limit);
DECLARE_mBool(enable_reader_dryrun_when_download_file_cache);
DECLARE_mInt64(file_cache_background_monitor_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_info_update_interval_ms);
DECLARE_mInt64(file_cache_background_tablet_id_flush_interval_ms);
DECLARE_mInt64(file_cache_background_ttl_gc_batch);
DECLARE_Int32(file_cache_downloader_thread_num_min);
DECLARE_Int32(file_cache_downloader_thread_num_max);
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "exec/schema_scanner/schema_columns_scanner.h"
#include "exec/schema_scanner/schema_dummy_scanner.h"
#include "exec/schema_scanner/schema_encryption_keys_scanner.h"
#include "exec/schema_scanner/schema_file_cache_info_scanner.h"
#include "exec/schema_scanner/schema_file_cache_statistics.h"
#include "exec/schema_scanner/schema_files_scanner.h"
#include "exec/schema_scanner/schema_load_job_scanner.h"
Expand Down Expand Up @@ -258,6 +259,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
return SchemaClusterSnapshotPropertiesScanner::create_unique();
case TSchemaTableType::SCH_COLUMN_DATA_SIZES:
return SchemaColumnDataSizesScanner::create_unique();
case TSchemaTableType::SCH_FILE_CACHE_INFO:
return SchemaFileCacheInfoScanner::create_unique();
default:
return SchemaDummyScanner::create_unique();
break;
Expand Down
Loading
Loading