Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions be/src/cloud/cloud_internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,57 @@ bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency(
"file_cache_get_by_peer_read_cache_file_latency");
bvar::LatencyRecorder g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency(
"cloud_internal_service_get_file_cache_meta_by_tablet_id_latency");
bvar::Adder<int64_t> g_cloud_sync_tablet_meta_requests_total(
"cloud_sync_tablet_meta_requests_total");
bvar::Adder<int64_t> g_cloud_sync_tablet_meta_synced_total("cloud_sync_tablet_meta_synced_total");
bvar::Adder<int64_t> g_cloud_sync_tablet_meta_skipped_total("cloud_sync_tablet_meta_skipped_total");
bvar::Adder<int64_t> g_cloud_sync_tablet_meta_failed_total("cloud_sync_tablet_meta_failed_total");

CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env)
: PInternalService(exec_env), _engine(engine) {}

CloudInternalServiceImpl::~CloudInternalServiceImpl() = default;

void CloudInternalServiceImpl::sync_tablet_meta(google::protobuf::RpcController* controller,
const PSyncTabletMetaRequest* request,
PSyncTabletMetaResponse* response,
google::protobuf::Closure* done) {
bool ret = _light_work_pool.try_offer([this, request, response, done]() {
brpc::ClosureGuard closure_guard(done);
int64_t synced = 0;
int64_t skipped = 0;
int64_t failed = 0;
g_cloud_sync_tablet_meta_requests_total << 1;
for (const auto tablet_id : request->tablet_ids()) {
auto tablet = _engine.tablet_mgr().get_tablet_if_cached(tablet_id);
if (!tablet) {
++skipped;
continue;
}
auto st = tablet->sync_meta();
if (!st.ok()) {
++failed;
LOG(WARNING) << "failed to sync tablet meta from cloud meta service, tablet="
<< tablet_id << ", err=" << st;
continue;
}
++synced;
}
g_cloud_sync_tablet_meta_synced_total << synced;
g_cloud_sync_tablet_meta_skipped_total << skipped;
g_cloud_sync_tablet_meta_failed_total << failed;
response->set_synced_tablets(synced);
response->set_skipped_tablets(skipped);
response->set_failed_tablets(failed);
Status::OK().to_protobuf(response->mutable_status());
});
if (!ret) {
brpc::ClosureGuard closure_guard(done);
Status::InternalError("failed to offer sync_tablet_meta request to work pool")
.to_protobuf(response->mutable_status());
}
}

void CloudInternalServiceImpl::alter_vault_sync(google::protobuf::RpcController* controller,
const doris::PAlterVaultSyncRequest* request,
PAlterVaultSyncResponse* response,
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class CloudInternalServiceImpl final : public PInternalService {
PAlterVaultSyncResponse* response,
google::protobuf::Closure* done) override;

void sync_tablet_meta(google::protobuf::RpcController* controller,
const PSyncTabletMetaRequest* request, PSyncTabletMetaResponse* response,
google::protobuf::Closure* done) override;

// Get messages (filename, offset, size) about the tablet data in cache
void get_file_cache_meta_by_tablet_id(google::protobuf::RpcController* controller,
const PGetFileCacheMetaRequest* request,
Expand Down
14 changes: 10 additions & 4 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,12 @@ void CloudTablet::get_compaction_status(std::string* json_result) {

// get snapshot version path json_doc
_timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
rapidjson::Value compaction_policy_value;
auto compaction_policy = _tablet_meta->compaction_policy();
compaction_policy_value.SetString(compaction_policy.c_str(),
cast_set<uint32_t>(compaction_policy.length()),
root.GetAllocator());
root.AddMember("compaction policy", compaction_policy_value, root.GetAllocator());
root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator());
rapidjson::Value cumu_value;
std::string format_str = ToStringFromUnixMillis(_last_cumu_compaction_failure_millis.load());
Expand Down Expand Up @@ -1376,10 +1382,6 @@ void CloudTablet::agg_delete_bitmap_for_compaction(
}

Status CloudTablet::sync_meta() {
if (!config::enable_file_cache) {
return Status::OK();
}

TabletMetaSharedPtr tablet_meta;
auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
if (!st.ok()) {
Expand All @@ -1393,6 +1395,10 @@ Status CloudTablet::sync_meta() {
if (_tablet_meta->compaction_policy() != new_compaction_policy) {
_tablet_meta->set_compaction_policy(new_compaction_policy);
}
auto new_ttl_seconds = tablet_meta->ttl_seconds();
if (_tablet_meta->ttl_seconds() != new_ttl_seconds) {
_tablet_meta->set_ttl_seconds(new_ttl_seconds);
}
auto new_time_series_compaction_goal_size_mbytes =
tablet_meta->time_series_compaction_goal_size_mbytes();
if (_tablet_meta->time_series_compaction_goal_size_mbytes() !=
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_tablet_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ bool CloudTabletMgr::peek_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* ta
return true;
}

std::shared_ptr<CloudTablet> CloudTabletMgr::get_tablet_if_cached(int64_t tablet_id) {
return _tablet_map->get(tablet_id);
}

void CloudTabletMgr::erase_tablet(int64_t tablet_id) {
auto tablet_id_str = std::to_string(tablet_id);
CacheKey key(tablet_id_str.data(), tablet_id_str.size());
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_tablet_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ class CloudTabletMgr {
// Return true if cached tablet meta is found (without triggering RPC) and filled.
bool peek_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta);

std::shared_ptr<CloudTablet> get_tablet_if_cached(int64_t tablet_id);

void erase_tablet(int64_t tablet_id);

void vacuum_stale_rowsets(const CountDownLatch& stop_latch);
Expand Down
9 changes: 9 additions & 0 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2530,5 +2530,14 @@ void PInternalService::request_cdc_client(google::protobuf::RpcController* contr
}
}

void PInternalService::sync_tablet_meta(google::protobuf::RpcController* controller,
const PSyncTabletMetaRequest* request,
PSyncTabletMetaResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard closure_guard(done);
Status::NotSupported("sync_tablet_meta only supports cloud mode")
.to_protobuf(response->mutable_status());
}

#include "common/compile_check_avoid_end.h"
} // namespace doris
4 changes: 4 additions & 0 deletions be/src/service/internal_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ class PInternalService : public PBackendService {
const PGetBeResourceRequest* request, PGetBeResourceResponse* response,
google::protobuf::Closure* done) override;

void sync_tablet_meta(google::protobuf::RpcController* controller,
const PSyncTabletMetaRequest* request, PSyncTabletMetaResponse* response,
google::protobuf::Closure* done) override;

void delete_dictionary(google::protobuf::RpcController* controller,
const PDeleteDictionaryRequest* request,
PDeleteDictionaryResponse* response,
Expand Down
160 changes: 160 additions & 0 deletions be/test/cloud/cloud_internal_service_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "cloud/cloud_internal_service.h"

#include <chrono>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <unordered_map>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet_mgr.h"
#include "cpp/sync_point.h"
#include "gen_cpp/Status_types.h"
#include "storage/tablet/tablet_meta.h"
#include "util/uid_util.h"

namespace doris {

namespace {

class TestClosure final : public google::protobuf::Closure {
public:
explicit TestClosure(std::function<void()> callback) : _callback(std::move(callback)) {}

void Run() override {
if (_callback) {
_callback();
}
}

private:
std::function<void()> _callback;
};

} // namespace

class CloudInternalServiceTest : public testing::Test {
public:
CloudInternalServiceTest() : _engine(CloudStorageEngine(EngineOptions())) {}

protected:
static constexpr int64_t kSyncedTabletId = 90001;
static constexpr int64_t kFailedTabletId = 90002;
static constexpr int64_t kUncachedTabletId = 90003;

void SetUp() override {
auto sp = SyncPoint::get_instance();
sp->clear_all_call_backs();
sp->enable_processing();

sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets",
[](auto&& args) { try_any_cast_ret<Status>(args)->second = true; });
sp->set_call_back("CloudMetaMgr::get_tablet_meta", [this](auto&& args) {
const auto tablet_id = try_any_cast<int64_t>(args[0]);
auto* tablet_meta_ptr = try_any_cast<TabletMetaSharedPtr*>(args[1]);
auto* ret = try_any_cast_ret<Status>(args);

auto& call_count = _tablet_meta_call_count[tablet_id];
++call_count;
if (tablet_id == kFailedTabletId && call_count >= 2) {
ret->first = Status::InternalError<false>("injected sync_meta failure");
ret->second = true;
return;
}

*tablet_meta_ptr =
create_tablet_meta(tablet_id, call_count >= 2 ? "time_series" : "size_based");
ret->second = true;
});
}

void TearDown() override {
auto sp = SyncPoint::get_instance();
sp->disable_processing();
sp->clear_all_call_backs();
}

TabletMetaSharedPtr create_tablet_meta(int64_t tablet_id, std::string compaction_policy) {
TTabletSchema tablet_schema;
TColumn column;
column.__set_column_name("c1");
column.__set_column_type(TColumnType());
column.column_type.__set_type(TPrimitiveType::INT);
column.__set_is_key(true);
column.__set_aggregation_type(TAggregationType::NONE);
column.__set_col_unique_id(0);
tablet_schema.__set_columns({column});
tablet_schema.__set_keys_type(TKeysType::DUP_KEYS);

std::unordered_map<uint32_t, uint32_t> col_ordinal_to_unique_id = {{0, 0}};
auto tablet_meta = std::make_shared<TabletMeta>(
1, 2, tablet_id, 15674, 4, 5, tablet_schema, 1, col_ordinal_to_unique_id,
UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F);
tablet_meta->set_compaction_policy(std::move(compaction_policy));
return tablet_meta;
}

CloudStorageEngine _engine;
std::unordered_map<int64_t, int> _tablet_meta_call_count;
};

TEST_F(CloudInternalServiceTest, TestSyncTabletMetaCountsSyncedSkippedAndFailedTablets) {
auto synced_res = _engine.tablet_mgr().get_tablet(kSyncedTabletId);
ASSERT_TRUE(synced_res.has_value()) << synced_res.error();
auto failed_res = _engine.tablet_mgr().get_tablet(kFailedTabletId);
ASSERT_TRUE(failed_res.has_value()) << failed_res.error();

auto cached_synced_tablet = synced_res.value();
auto cached_failed_tablet = failed_res.value();
ASSERT_EQ("size_based", cached_synced_tablet->tablet_meta()->compaction_policy());
ASSERT_EQ("size_based", cached_failed_tablet->tablet_meta()->compaction_policy());

CloudInternalServiceImpl service(_engine, nullptr);
PSyncTabletMetaRequest request;
request.add_tablet_ids(kSyncedTabletId);
request.add_tablet_ids(kUncachedTabletId);
request.add_tablet_ids(kFailedTabletId);
PSyncTabletMetaResponse response;

std::mutex mutex;
std::condition_variable cv;
bool done = false;
TestClosure closure([&]() {
std::lock_guard lock(mutex);
done = true;
cv.notify_all();
});

service.sync_tablet_meta(nullptr, &request, &response, &closure);

std::unique_lock lock(mutex);
ASSERT_TRUE(cv.wait_for(lock, std::chrono::seconds(5), [&] { return done; }));

ASSERT_TRUE(response.has_status());
EXPECT_EQ(TStatusCode::OK, response.status().status_code()) << response.status().DebugString();
EXPECT_EQ(1, response.synced_tablets());
EXPECT_EQ(1, response.skipped_tablets());
EXPECT_EQ(1, response.failed_tablets());
EXPECT_EQ("time_series", cached_synced_tablet->tablet_meta()->compaction_policy());
EXPECT_EQ("size_based", cached_failed_tablet->tablet_meta()->compaction_policy());
EXPECT_EQ(nullptr, _engine.tablet_mgr().get_tablet_if_cached(kUncachedTabletId));
}

} // namespace doris
32 changes: 32 additions & 0 deletions be/test/cloud/cloud_tablet_mgr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,4 +178,36 @@ TEST_F(CloudTabletMgrTest, TestConcurrentGetTabletTabletMapConsistency) {
sp->clear_all_call_backs();
}

TEST_F(CloudTabletMgrTest, TestGetTabletIfCachedOnlyReturnsCachedTablet) {
auto sp = SyncPoint::get_instance();
sp->clear_all_call_backs();
sp->enable_processing();

sp->set_call_back("CloudMetaMgr::get_tablet_meta", [this](auto&& args) {
auto* tablet_meta_ptr = try_any_cast<TabletMetaSharedPtr*>(args[1]);
*tablet_meta_ptr = _tablet_meta;
try_any_cast_ret<Status>(args)->second = true;
});
sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets",
[](auto&& args) { try_any_cast_ret<Status>(args)->second = true; });

CloudTabletMgr mgr(_engine);
const int64_t cached_tablet_id = 99999;
const int64_t uncached_tablet_id = 100000;

EXPECT_EQ(nullptr, mgr.get_tablet_if_cached(cached_tablet_id));

auto res = mgr.get_tablet(cached_tablet_id);
ASSERT_TRUE(res.has_value()) << res.error();

auto cached_tablet = mgr.get_tablet_if_cached(cached_tablet_id);
ASSERT_NE(nullptr, cached_tablet);
EXPECT_EQ(cached_tablet_id, cached_tablet->tablet_id());

EXPECT_EQ(nullptr, mgr.get_tablet_if_cached(uncached_tablet_id));

sp->disable_processing();
sp->clear_all_call_backs();
}

} // namespace doris
Loading
Loading