From 324fe418b174d3b18138abe83fbe7f13cef0cefb Mon Sep 17 00:00:00 2001 From: Luwei <814383175@qq.com> Date: Fri, 20 Mar 2026 23:00:03 +0800 Subject: [PATCH] [enhance](cloud) proactively sync tablet meta after alter FE now sends a sync_tablet_meta RPC to all alive cloud backends after alter updates tablet meta in meta service. The request carries affected tablet ids and is dispatched as a best-effort notification, so alter success still depends on meta service update instead of backend acknowledgements. BE handles the RPC by refreshing meta only for tablets that are already cached locally. Uncached tablets are skipped, which avoids polluting tablet cache while still fixing stale compaction policy and related tablet meta on active compute clusters. The RPC also returns synced/skipped/failed counts and exposes bvar counters for observability. This change adds FE and BE unit tests and a cloud regression suite. The regression covers cached and uncached multi-cluster behavior, the negative path with proactive notify disabled, and the version-limit scenario where a size_based table hits too many versions, is altered to time_series, and can accept new writes immediately after alter. --- be/src/cloud/cloud_internal_service.cpp | 45 +++ be/src/cloud/cloud_internal_service.h | 4 + be/src/cloud/cloud_tablet.cpp | 14 +- be/src/cloud/cloud_tablet_mgr.cpp | 4 + be/src/cloud/cloud_tablet_mgr.h | 2 + be/src/service/internal_service.cpp | 9 + be/src/service/internal_service.h | 4 + be/test/cloud/cloud_internal_service_test.cpp | 160 ++++++++++ be/test/cloud/cloud_tablet_mgr_test.cpp | 32 ++ be/test/cloud/cloud_tablet_test.cpp | 51 +++- .../cloud/alter/CloudSchemaChangeHandler.java | 63 ++++ .../doris/rpc/BackendServiceClient.java | 5 + .../apache/doris/rpc/BackendServiceProxy.java | 12 + .../alter/CloudSchemaChangeHandlerTest.java | 269 +++++++++++++++++ gensrc/proto/internal_service.proto | 13 +- ..._cloud_sync_tablet_meta_after_alter.groovy | 284 ++++++++++++++++++ 16 files changed, 959 insertions(+), 12 deletions(-) create mode 100644 be/test/cloud/cloud_internal_service_test.cpp create mode 100644 fe/fe-core/src/test/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandlerTest.java create mode 100644 regression-test/suites/fault_injection_p0/cloud/test_cloud_sync_tablet_meta_after_alter.groovy diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index 99b08a9bf38564..573cbf2e96a1e2 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -49,12 +49,57 @@ bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency( "file_cache_get_by_peer_read_cache_file_latency"); bvar::LatencyRecorder g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency( "cloud_internal_service_get_file_cache_meta_by_tablet_id_latency"); +bvar::Adder g_cloud_sync_tablet_meta_requests_total( + "cloud_sync_tablet_meta_requests_total"); +bvar::Adder g_cloud_sync_tablet_meta_synced_total("cloud_sync_tablet_meta_synced_total"); +bvar::Adder g_cloud_sync_tablet_meta_skipped_total("cloud_sync_tablet_meta_skipped_total"); +bvar::Adder g_cloud_sync_tablet_meta_failed_total("cloud_sync_tablet_meta_failed_total"); CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env) : PInternalService(exec_env), _engine(engine) {} CloudInternalServiceImpl::~CloudInternalServiceImpl() = default; +void CloudInternalServiceImpl::sync_tablet_meta(google::protobuf::RpcController* controller, + const PSyncTabletMetaRequest* request, + PSyncTabletMetaResponse* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + int64_t synced = 0; + int64_t skipped = 0; + int64_t failed = 0; + g_cloud_sync_tablet_meta_requests_total << 1; + for (const auto tablet_id : request->tablet_ids()) { + auto tablet = _engine.tablet_mgr().get_tablet_if_cached(tablet_id); + if (!tablet) { + ++skipped; + continue; + } + auto st = tablet->sync_meta(); + if (!st.ok()) { + ++failed; + LOG(WARNING) << "failed to sync tablet meta from cloud meta service, tablet=" + << tablet_id << ", err=" << st; + continue; + } + ++synced; + } + g_cloud_sync_tablet_meta_synced_total << synced; + g_cloud_sync_tablet_meta_skipped_total << skipped; + g_cloud_sync_tablet_meta_failed_total << failed; + response->set_synced_tablets(synced); + response->set_skipped_tablets(skipped); + response->set_failed_tablets(failed); + Status::OK().to_protobuf(response->mutable_status()); + }); + if (!ret) { + brpc::ClosureGuard closure_guard(done); + Status::InternalError("failed to offer sync_tablet_meta request to work pool") + .to_protobuf(response->mutable_status()); + } +} + void CloudInternalServiceImpl::alter_vault_sync(google::protobuf::RpcController* controller, const doris::PAlterVaultSyncRequest* request, PAlterVaultSyncResponse* response, diff --git a/be/src/cloud/cloud_internal_service.h b/be/src/cloud/cloud_internal_service.h index db4916313fe596..d239d891738b4a 100644 --- a/be/src/cloud/cloud_internal_service.h +++ b/be/src/cloud/cloud_internal_service.h @@ -34,6 +34,10 @@ class CloudInternalServiceImpl final : public PInternalService { PAlterVaultSyncResponse* response, google::protobuf::Closure* done) override; + void sync_tablet_meta(google::protobuf::RpcController* controller, + const PSyncTabletMetaRequest* request, PSyncTabletMetaResponse* response, + google::protobuf::Closure* done) override; + // Get messages (filename, offset, size) about the tablet data in cache void get_file_cache_meta_by_tablet_id(google::protobuf::RpcController* controller, const PGetFileCacheMetaRequest* request, diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 2567019301cff5..0eb7181c7e26a9 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -868,6 +868,12 @@ void CloudTablet::get_compaction_status(std::string* json_result) { // get snapshot version path json_doc _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr); + rapidjson::Value compaction_policy_value; + auto compaction_policy = _tablet_meta->compaction_policy(); + compaction_policy_value.SetString(compaction_policy.c_str(), + cast_set(compaction_policy.length()), + root.GetAllocator()); + root.AddMember("compaction policy", compaction_policy_value, root.GetAllocator()); root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator()); rapidjson::Value cumu_value; std::string format_str = ToStringFromUnixMillis(_last_cumu_compaction_failure_millis.load()); @@ -1376,10 +1382,6 @@ void CloudTablet::agg_delete_bitmap_for_compaction( } Status CloudTablet::sync_meta() { - if (!config::enable_file_cache) { - return Status::OK(); - } - TabletMetaSharedPtr tablet_meta; auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta); if (!st.ok()) { @@ -1393,6 +1395,10 @@ Status CloudTablet::sync_meta() { if (_tablet_meta->compaction_policy() != new_compaction_policy) { _tablet_meta->set_compaction_policy(new_compaction_policy); } + auto new_ttl_seconds = tablet_meta->ttl_seconds(); + if (_tablet_meta->ttl_seconds() != new_ttl_seconds) { + _tablet_meta->set_ttl_seconds(new_ttl_seconds); + } auto new_time_series_compaction_goal_size_mbytes = tablet_meta->time_series_compaction_goal_size_mbytes(); if (_tablet_meta->time_series_compaction_goal_size_mbytes() != diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 3e979864138645..6d0648f9233413 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -288,6 +288,10 @@ bool CloudTabletMgr::peek_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* ta return true; } +std::shared_ptr CloudTabletMgr::get_tablet_if_cached(int64_t tablet_id) { + return _tablet_map->get(tablet_id); +} + void CloudTabletMgr::erase_tablet(int64_t tablet_id) { auto tablet_id_str = std::to_string(tablet_id); CacheKey key(tablet_id_str.data(), tablet_id_str.size()); diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h index 9894d97552b872..b9b6d06e12efb5 100644 --- a/be/src/cloud/cloud_tablet_mgr.h +++ b/be/src/cloud/cloud_tablet_mgr.h @@ -62,6 +62,8 @@ class CloudTabletMgr { // Return true if cached tablet meta is found (without triggering RPC) and filled. bool peek_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta); + std::shared_ptr get_tablet_if_cached(int64_t tablet_id); + void erase_tablet(int64_t tablet_id); void vacuum_stale_rowsets(const CountDownLatch& stop_latch); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 2f2250f265db90..e46526fe0cea90 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -2530,5 +2530,14 @@ void PInternalService::request_cdc_client(google::protobuf::RpcController* contr } } +void PInternalService::sync_tablet_meta(google::protobuf::RpcController* controller, + const PSyncTabletMetaRequest* request, + PSyncTabletMetaResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard closure_guard(done); + Status::NotSupported("sync_tablet_meta only supports cloud mode") + .to_protobuf(response->mutable_status()); +} + #include "common/compile_check_avoid_end.h" } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index b1e11ec34b73c4..3042d2cae3a105 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -223,6 +223,10 @@ class PInternalService : public PBackendService { const PGetBeResourceRequest* request, PGetBeResourceResponse* response, google::protobuf::Closure* done) override; + void sync_tablet_meta(google::protobuf::RpcController* controller, + const PSyncTabletMetaRequest* request, PSyncTabletMetaResponse* response, + google::protobuf::Closure* done) override; + void delete_dictionary(google::protobuf::RpcController* controller, const PDeleteDictionaryRequest* request, PDeleteDictionaryResponse* response, diff --git a/be/test/cloud/cloud_internal_service_test.cpp b/be/test/cloud/cloud_internal_service_test.cpp new file mode 100644 index 00000000000000..fb2610c3b96245 --- /dev/null +++ b/be/test/cloud/cloud_internal_service_test.cpp @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_internal_service.h" + +#include +#include +#include +#include +#include + +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet_mgr.h" +#include "cpp/sync_point.h" +#include "gen_cpp/Status_types.h" +#include "storage/tablet/tablet_meta.h" +#include "util/uid_util.h" + +namespace doris { + +namespace { + +class TestClosure final : public google::protobuf::Closure { +public: + explicit TestClosure(std::function callback) : _callback(std::move(callback)) {} + + void Run() override { + if (_callback) { + _callback(); + } + } + +private: + std::function _callback; +}; + +} // namespace + +class CloudInternalServiceTest : public testing::Test { +public: + CloudInternalServiceTest() : _engine(CloudStorageEngine(EngineOptions())) {} + +protected: + static constexpr int64_t kSyncedTabletId = 90001; + static constexpr int64_t kFailedTabletId = 90002; + static constexpr int64_t kUncachedTabletId = 90003; + + void SetUp() override { + auto sp = SyncPoint::get_instance(); + sp->clear_all_call_backs(); + sp->enable_processing(); + + sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", + [](auto&& args) { try_any_cast_ret(args)->second = true; }); + sp->set_call_back("CloudMetaMgr::get_tablet_meta", [this](auto&& args) { + const auto tablet_id = try_any_cast(args[0]); + auto* tablet_meta_ptr = try_any_cast(args[1]); + auto* ret = try_any_cast_ret(args); + + auto& call_count = _tablet_meta_call_count[tablet_id]; + ++call_count; + if (tablet_id == kFailedTabletId && call_count >= 2) { + ret->first = Status::InternalError("injected sync_meta failure"); + ret->second = true; + return; + } + + *tablet_meta_ptr = + create_tablet_meta(tablet_id, call_count >= 2 ? "time_series" : "size_based"); + ret->second = true; + }); + } + + void TearDown() override { + auto sp = SyncPoint::get_instance(); + sp->disable_processing(); + sp->clear_all_call_backs(); + } + + TabletMetaSharedPtr create_tablet_meta(int64_t tablet_id, std::string compaction_policy) { + TTabletSchema tablet_schema; + TColumn column; + column.__set_column_name("c1"); + column.__set_column_type(TColumnType()); + column.column_type.__set_type(TPrimitiveType::INT); + column.__set_is_key(true); + column.__set_aggregation_type(TAggregationType::NONE); + column.__set_col_unique_id(0); + tablet_schema.__set_columns({column}); + tablet_schema.__set_keys_type(TKeysType::DUP_KEYS); + + std::unordered_map col_ordinal_to_unique_id = {{0, 0}}; + auto tablet_meta = std::make_shared( + 1, 2, tablet_id, 15674, 4, 5, tablet_schema, 1, col_ordinal_to_unique_id, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F); + tablet_meta->set_compaction_policy(std::move(compaction_policy)); + return tablet_meta; + } + + CloudStorageEngine _engine; + std::unordered_map _tablet_meta_call_count; +}; + +TEST_F(CloudInternalServiceTest, TestSyncTabletMetaCountsSyncedSkippedAndFailedTablets) { + auto synced_res = _engine.tablet_mgr().get_tablet(kSyncedTabletId); + ASSERT_TRUE(synced_res.has_value()) << synced_res.error(); + auto failed_res = _engine.tablet_mgr().get_tablet(kFailedTabletId); + ASSERT_TRUE(failed_res.has_value()) << failed_res.error(); + + auto cached_synced_tablet = synced_res.value(); + auto cached_failed_tablet = failed_res.value(); + ASSERT_EQ("size_based", cached_synced_tablet->tablet_meta()->compaction_policy()); + ASSERT_EQ("size_based", cached_failed_tablet->tablet_meta()->compaction_policy()); + + CloudInternalServiceImpl service(_engine, nullptr); + PSyncTabletMetaRequest request; + request.add_tablet_ids(kSyncedTabletId); + request.add_tablet_ids(kUncachedTabletId); + request.add_tablet_ids(kFailedTabletId); + PSyncTabletMetaResponse response; + + std::mutex mutex; + std::condition_variable cv; + bool done = false; + TestClosure closure([&]() { + std::lock_guard lock(mutex); + done = true; + cv.notify_all(); + }); + + service.sync_tablet_meta(nullptr, &request, &response, &closure); + + std::unique_lock lock(mutex); + ASSERT_TRUE(cv.wait_for(lock, std::chrono::seconds(5), [&] { return done; })); + + ASSERT_TRUE(response.has_status()); + EXPECT_EQ(TStatusCode::OK, response.status().status_code()) << response.status().DebugString(); + EXPECT_EQ(1, response.synced_tablets()); + EXPECT_EQ(1, response.skipped_tablets()); + EXPECT_EQ(1, response.failed_tablets()); + EXPECT_EQ("time_series", cached_synced_tablet->tablet_meta()->compaction_policy()); + EXPECT_EQ("size_based", cached_failed_tablet->tablet_meta()->compaction_policy()); + EXPECT_EQ(nullptr, _engine.tablet_mgr().get_tablet_if_cached(kUncachedTabletId)); +} + +} // namespace doris diff --git a/be/test/cloud/cloud_tablet_mgr_test.cpp b/be/test/cloud/cloud_tablet_mgr_test.cpp index 3c9e0b26eb1059..c893b72a475c7f 100644 --- a/be/test/cloud/cloud_tablet_mgr_test.cpp +++ b/be/test/cloud/cloud_tablet_mgr_test.cpp @@ -178,4 +178,36 @@ TEST_F(CloudTabletMgrTest, TestConcurrentGetTabletTabletMapConsistency) { sp->clear_all_call_backs(); } +TEST_F(CloudTabletMgrTest, TestGetTabletIfCachedOnlyReturnsCachedTablet) { + auto sp = SyncPoint::get_instance(); + sp->clear_all_call_backs(); + sp->enable_processing(); + + sp->set_call_back("CloudMetaMgr::get_tablet_meta", [this](auto&& args) { + auto* tablet_meta_ptr = try_any_cast(args[1]); + *tablet_meta_ptr = _tablet_meta; + try_any_cast_ret(args)->second = true; + }); + sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", + [](auto&& args) { try_any_cast_ret(args)->second = true; }); + + CloudTabletMgr mgr(_engine); + const int64_t cached_tablet_id = 99999; + const int64_t uncached_tablet_id = 100000; + + EXPECT_EQ(nullptr, mgr.get_tablet_if_cached(cached_tablet_id)); + + auto res = mgr.get_tablet(cached_tablet_id); + ASSERT_TRUE(res.has_value()) << res.error(); + + auto cached_tablet = mgr.get_tablet_if_cached(cached_tablet_id); + ASSERT_NE(nullptr, cached_tablet); + EXPECT_EQ(cached_tablet_id, cached_tablet->tablet_id()); + + EXPECT_EQ(nullptr, mgr.get_tablet_if_cached(uncached_tablet_id)); + + sp->disable_processing(); + sp->clear_all_call_backs(); +} + } // namespace doris diff --git a/be/test/cloud/cloud_tablet_test.cpp b/be/test/cloud/cloud_tablet_test.cpp index 66a07cbc29606a..336d0420769adf 100644 --- a/be/test/cloud/cloud_tablet_test.cpp +++ b/be/test/cloud/cloud_tablet_test.cpp @@ -721,6 +721,7 @@ class CloudTabletSyncMetaTest : public testing::Test { const std::string& compaction_policy = "size_based") { TTabletSchema new_schema; new_schema.__set_disable_auto_compaction(disable_auto_compaction); + new_schema.__set_is_in_memory(false); TColumn col; col.__set_column_name("test_col_" + std::to_string(_current_tablet_id)); col.__set_column_type(TColumnType()); @@ -878,8 +879,8 @@ TEST_F(CloudTabletSyncMetaTest, TestSyncMetaDisableAutoCompactionUnchanged) { sp->clear_all_call_backs(); } -// Test sync_meta is skipped when enable_file_cache is false -TEST_F(CloudTabletSyncMetaTest, TestSyncMetaSkippedWhenFileCacheDisabled) { +// sync_meta should keep refreshing tablet meta even when file cache is disabled. +TEST_F(CloudTabletSyncMetaTest, TestSyncMetaWhenFileCacheDisabled) { // Disable file cache config::enable_file_cache = false; @@ -890,17 +891,22 @@ TEST_F(CloudTabletSyncMetaTest, TestSyncMetaSkippedWhenFileCacheDisabled) { sp->clear_all_call_backs(); sp->enable_processing(); + auto mock_tablet_meta = createMockTabletMeta(true, "time_series"); bool callback_called = false; sp->set_call_back("CloudMetaMgr::get_tablet_meta", - [&callback_called](auto&& args) { callback_called = true; }); + [mock_tablet_meta, &callback_called](auto&& args) { + callback_called = true; + auto* tablet_meta_ptr = try_any_cast(args[1]); + *tablet_meta_ptr = mock_tablet_meta; + try_any_cast_ret(args)->second = true; + }); - // Call sync_meta - should return early without calling get_tablet_meta Status st = _tablet->sync_meta(); EXPECT_TRUE(st.ok()); - EXPECT_FALSE(callback_called); + EXPECT_TRUE(callback_called); - // Verify disable_auto_compaction is not changed - EXPECT_FALSE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()); + EXPECT_TRUE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()); + EXPECT_EQ(_tablet->tablet_meta()->compaction_policy(), "time_series"); sp->disable_processing(); sp->clear_all_call_backs(); @@ -940,6 +946,37 @@ TEST_F(CloudTabletSyncMetaTest, TestSyncMetaMultipleProperties) { sp->disable_processing(); sp->clear_all_call_backs(); } + +TEST_F(CloudTabletSyncMetaTest, TestSyncMetaSyncsTtlAndInMemory) { + EXPECT_EQ(0, _tablet->tablet_meta()->ttl_seconds()); + EXPECT_FALSE(_tablet->tablet_meta()->tablet_schema()->is_in_memory()); + + auto sp = SyncPoint::get_instance(); + sp->clear_all_call_backs(); + sp->enable_processing(); + + auto mock_tablet_meta = createMockTabletMeta(true, "time_series"); + mock_tablet_meta->set_ttl_seconds(3600); + mock_tablet_meta->mutable_tablet_schema()->set_is_in_memory(true); + + sp->set_call_back("CloudMetaMgr::get_tablet_meta", [mock_tablet_meta](auto&& args) { + auto* tablet_meta_ptr = try_any_cast(args[1]); + *tablet_meta_ptr = mock_tablet_meta; + try_any_cast_ret(args)->second = true; + }); + + Status st = _tablet->sync_meta(); + EXPECT_TRUE(st.ok()); + + EXPECT_EQ(3600, _tablet->tablet_meta()->ttl_seconds()); + EXPECT_TRUE(_tablet->tablet_meta()->tablet_schema()->is_in_memory()); + EXPECT_TRUE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()); + EXPECT_EQ(_tablet->tablet_meta()->compaction_policy(), "time_series"); + + sp->disable_processing(); + sp->clear_all_call_backs(); +} + class CloudTabletApplyVisiblePendingTest : public testing::Test { public: CloudTabletApplyVisiblePendingTest() : _engine(CloudStorageEngine(EngineOptions {})) {} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java index c2155963273e09..b6d6a6d0337a4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java @@ -34,12 +34,21 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.proto.InternalService; +import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -540,6 +549,60 @@ public void updateCloudPartitionMeta(Database db, if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { throw new UserException(response.getStatus().getMsg()); } + + notifyBackendsToSyncTabletMeta(updateTabletReq.getTabletMetaInfosList().stream() + .map(Cloud.TabletMetaInfoPB::getTabletId) + .collect(Collectors.toList())); + } + } + + void notifyBackendsToSyncTabletMeta(List tabletIds) { + if (tabletIds.isEmpty()) { + return; + } + if (DebugPointUtil.isEnable("CloudSchemaChangeHandler.notifyBackendsToSyncTabletMeta.skip")) { + LOG.info("skip sync tablet meta rpc dispatch by debug point, tabletIds={}", tabletIds); + return; + } + List backends; + try { + backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); + } catch (UserException e) { + LOG.warn("failed to get alive backends for sync tablet meta, tabletIds={}", tabletIds, e); + return; + } + + InternalService.PSyncTabletMetaRequest request = InternalService.PSyncTabletMetaRequest.newBuilder() + .addAllTabletIds(tabletIds) + .build(); + for (Backend backend : backends) { + if (!backend.isAlive() || backend.getBrpcPort() <= 0) { + continue; + } + try { + ListenableFuture future = + BackendServiceProxy.getInstance().syncTabletMeta(backend.getBrpcAddress(), request); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(InternalService.PSyncTabletMetaResponse response) { + if (response == null || !response.hasStatus() + || response.getStatus().getStatusCode() != TStatusCode.OK.getValue()) { + LOG.warn("sync tablet meta rpc returned non-ok response, backendId={}, tabletIds={}," + + " response={}", + backend.getId(), tabletIds, response); + } + } + + @Override + public void onFailure(Throwable t) { + LOG.warn("sync tablet meta rpc failed, backendId={}, tabletIds={}", + backend.getId(), tabletIds, t); + } + }, MoreExecutors.directExecutor()); + } catch (Exception e) { + LOG.warn("failed to dispatch sync tablet meta rpc, backendId={}, tabletIds={}", + backend.getId(), tabletIds, e); + } } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index fc3dac0c214764..87ffc9ae64094b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -192,6 +192,11 @@ public Future alterVaultSync( return stub.alterVaultSync(request); } + public ListenableFuture syncTabletMeta( + InternalService.PSyncTabletMetaRequest request) { + return stub.syncTabletMeta(request); + } + public Future getBeResource(InternalService.PGetBeResourceRequest request, int timeoutSec) { return stub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS).getBeResource(request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index fbdfb3cf223a70..d72514e0afa6b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -541,6 +541,18 @@ public Future alterVaultSync(TNetworkAddress address, } } + public ListenableFuture syncTabletMeta(TNetworkAddress address, + InternalService.PSyncTabletMetaRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.syncTabletMeta(request); + } catch (Throwable e) { + LOG.warn("failed to sync tablet meta from address={}:{}", address.getHostname(), + address.getPort(), e); + throw new RpcException(address.getHostname(), e.getMessage()); + } + } + public Future fetchRemoteTabletSchemaAsync( TNetworkAddress address, InternalService.PFetchRemoteSchemaRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandlerTest.java new file mode 100644 index 00000000000000..e2ad88a502ada6 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandlerTest.java @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.alter; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.MetaServiceProxy; +import org.apache.doris.common.Config; +import org.apache.doris.proto.InternalService; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Futures; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class CloudSchemaChangeHandlerTest { + private int originalCloudTxnTabletBatchSize; + private String originalCloudUniqueId; + private String originalMetaServiceEndpoint; + + @Before + public void setUp() { + originalCloudTxnTabletBatchSize = Config.cloud_txn_tablet_batch_size; + originalCloudUniqueId = Config.cloud_unique_id; + originalMetaServiceEndpoint = Config.meta_service_endpoint; + Config.cloud_txn_tablet_batch_size = 2; + Config.cloud_unique_id = "cloud-test"; + Config.meta_service_endpoint = "127.0.0.1:20121"; + } + + @After + public void tearDown() { + Config.cloud_txn_tablet_batch_size = originalCloudTxnTabletBatchSize; + Config.cloud_unique_id = originalCloudUniqueId; + Config.meta_service_endpoint = originalMetaServiceEndpoint; + } + + @Test + public void testUpdateTablePropertiesNotifiesAllAliveBackendsInBatches() throws Exception { + CloudSchemaChangeHandler handler = new CloudSchemaChangeHandler(); + Database db = Mockito.mock(Database.class); + OlapTable table = Mockito.mock(OlapTable.class); + Partition partition = Mockito.mock(Partition.class); + MaterializedIndex index = Mockito.mock(MaterializedIndex.class); + org.apache.doris.catalog.Tablet tablet1 = Mockito.mock(org.apache.doris.catalog.Tablet.class); + org.apache.doris.catalog.Tablet tablet2 = Mockito.mock(org.apache.doris.catalog.Tablet.class); + org.apache.doris.catalog.Tablet tablet3 = Mockito.mock(org.apache.doris.catalog.Tablet.class); + + Mockito.when(db.getTableOrMetaException("tbl", org.apache.doris.catalog.Table.TableType.OLAP)) + .thenReturn(table); + Mockito.when(table.getName()).thenReturn("tbl"); + Mockito.when(table.getCompactionPolicy()).thenReturn("size_based"); + Mockito.when(table.getKeysType()).thenReturn(org.apache.doris.catalog.KeysType.DUP_KEYS); + Mockito.when(table.getPartitions()).thenReturn(Arrays.asList(partition)); + Mockito.when(table.getPartition("p1")).thenReturn(partition); + Mockito.when(partition.getName()).thenReturn("p1"); + Mockito.when(partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) + .thenReturn(Arrays.asList(index)); + Mockito.when(index.getTablets()).thenReturn(Arrays.asList(tablet1, tablet2, tablet3)); + Mockito.when(tablet1.getId()).thenReturn(101L); + Mockito.when(tablet2.getId()).thenReturn(102L); + Mockito.when(tablet3.getId()).thenReturn(103L); + + Backend aliveBackend1 = new Backend(1L, "be-1", 9050); + aliveBackend1.setAlive(true); + aliveBackend1.setBrpcPort(8060); + Backend aliveBackend2 = new Backend(2L, "be-2", 9050); + aliveBackend2.setAlive(true); + aliveBackend2.setBrpcPort(8061); + Backend deadBackend = new Backend(3L, "be-dead", 9050); + deadBackend.setAlive(false); + deadBackend.setBrpcPort(8062); + + SystemInfoService systemInfoService = Mockito.mock(SystemInfoService.class); + Mockito.when(systemInfoService.getAllBackendsByAllCluster()).thenReturn(ImmutableMap.of( + aliveBackend1.getId(), aliveBackend1, + aliveBackend2.getId(), aliveBackend2, + deadBackend.getId(), deadBackend)); + + Env env = Mockito.mock(Env.class); + + MetaServiceProxy metaServiceProxy = Mockito.mock(MetaServiceProxy.class); + Mockito.when(metaServiceProxy.updateTablet(Mockito.any())).thenReturn(okUpdateTabletResponse()); + + BackendServiceProxy backendServiceProxy = Mockito.mock(BackendServiceProxy.class); + Mockito.when(backendServiceProxy.syncTabletMeta(Mockito.any(), Mockito.any())) + .thenReturn(Futures.immediateFuture(okSyncTabletMetaResponse())); + + try (MockedStatic envMock = Mockito.mockStatic(Env.class); + MockedStatic metaProxyMock = Mockito.mockStatic(MetaServiceProxy.class); + MockedStatic backendProxyMock = Mockito.mockStatic(BackendServiceProxy.class)) { + envMock.when(Env::getCurrentEnv).thenReturn(env); + envMock.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService); + metaProxyMock.when(MetaServiceProxy::getInstance).thenReturn(metaServiceProxy); + backendProxyMock.when(BackendServiceProxy::getInstance).thenReturn(backendServiceProxy); + + Map properties = new HashMap<>(); + properties.put("compaction_policy", "time_series"); + handler.updateTableProperties(db, "tbl", properties); + } + + ArgumentCaptor updateCaptor = + ArgumentCaptor.forClass(Cloud.UpdateTabletRequest.class); + Mockito.verify(metaServiceProxy, Mockito.times(2)).updateTablet(updateCaptor.capture()); + List updateRequests = updateCaptor.getAllValues(); + Assert.assertEquals(Arrays.asList(101L, 102L), + updateRequests.get(0).getTabletMetaInfosList().stream() + .map(Cloud.TabletMetaInfoPB::getTabletId).collect(Collectors.toList())); + Assert.assertEquals(Arrays.asList(103L), + updateRequests.get(1).getTabletMetaInfosList().stream() + .map(Cloud.TabletMetaInfoPB::getTabletId).collect(Collectors.toList())); + + ArgumentCaptor addressCaptor = ArgumentCaptor.forClass(TNetworkAddress.class); + ArgumentCaptor syncCaptor = + ArgumentCaptor.forClass(InternalService.PSyncTabletMetaRequest.class); + Mockito.verify(backendServiceProxy, Mockito.times(4)) + .syncTabletMeta(addressCaptor.capture(), syncCaptor.capture()); + + List addresses = addressCaptor.getAllValues(); + Assert.assertFalse(addresses.stream().anyMatch(addr -> "be-dead".equals(addr.getHostname()))); + Assert.assertEquals(2L, addresses.stream().filter(addr -> "be-1".equals(addr.getHostname())).count()); + Assert.assertEquals(2L, addresses.stream().filter(addr -> "be-2".equals(addr.getHostname())).count()); + + List syncRequests = syncCaptor.getAllValues(); + Assert.assertEquals(Arrays.asList(101L, 102L), syncRequests.get(0).getTabletIdsList()); + Assert.assertEquals(Arrays.asList(101L, 102L), syncRequests.get(1).getTabletIdsList()); + Assert.assertEquals(Arrays.asList(103L), syncRequests.get(2).getTabletIdsList()); + Assert.assertEquals(Arrays.asList(103L), syncRequests.get(3).getTabletIdsList()); + } + + @Test + public void testNotifyBackendsToSyncTabletMetaSwallowsDispatchFailure() throws Exception { + CloudSchemaChangeHandler handler = new CloudSchemaChangeHandler(); + Backend aliveBackend1 = new Backend(1L, "be-1", 9050); + aliveBackend1.setAlive(true); + aliveBackend1.setBrpcPort(8060); + Backend aliveBackend2 = new Backend(2L, "be-2", 9050); + aliveBackend2.setAlive(true); + aliveBackend2.setBrpcPort(8061); + + SystemInfoService systemInfoService = Mockito.mock(SystemInfoService.class); + Mockito.when(systemInfoService.getAllBackendsByAllCluster()).thenReturn(ImmutableMap.of( + aliveBackend1.getId(), aliveBackend1, + aliveBackend2.getId(), aliveBackend2)); + + BackendServiceProxy backendServiceProxy = Mockito.mock(BackendServiceProxy.class); + Mockito.when(backendServiceProxy.syncTabletMeta(Mockito.argThat( + addr -> addr != null && "be-1".equals(addr.getHostname())), Mockito.any())) + .thenReturn(Futures.immediateFuture(okSyncTabletMetaResponse())); + Mockito.when(backendServiceProxy.syncTabletMeta(Mockito.argThat( + addr -> addr != null && "be-2".equals(addr.getHostname())), Mockito.any())) + .thenThrow(new org.apache.doris.rpc.RpcException("be-2", "dispatch failed")); + + try (MockedStatic envMock = Mockito.mockStatic(Env.class); + MockedStatic backendProxyMock = Mockito.mockStatic(BackendServiceProxy.class)) { + envMock.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService); + backendProxyMock.when(BackendServiceProxy::getInstance).thenReturn(backendServiceProxy); + + handler.notifyBackendsToSyncTabletMeta(Arrays.asList(101L, 102L)); + } + + Mockito.verify(backendServiceProxy, Mockito.times(2)) + .syncTabletMeta(Mockito.any(), Mockito.any()); + } + + @Test + public void testNotifyBackendsToSyncTabletMetaSwallowsNonOkResponse() throws Exception { + CloudSchemaChangeHandler handler = new CloudSchemaChangeHandler(); + Backend aliveBackend1 = new Backend(1L, "be-1", 9050); + aliveBackend1.setAlive(true); + aliveBackend1.setBrpcPort(8060); + Backend aliveBackend2 = new Backend(2L, "be-2", 9050); + aliveBackend2.setAlive(true); + aliveBackend2.setBrpcPort(8061); + + SystemInfoService systemInfoService = Mockito.mock(SystemInfoService.class); + Mockito.when(systemInfoService.getAllBackendsByAllCluster()).thenReturn(ImmutableMap.of( + aliveBackend1.getId(), aliveBackend1, + aliveBackend2.getId(), aliveBackend2)); + + BackendServiceProxy backendServiceProxy = Mockito.mock(BackendServiceProxy.class); + Mockito.when(backendServiceProxy.syncTabletMeta(Mockito.any(), Mockito.any())) + .thenReturn(Futures.immediateFuture(InternalService.PSyncTabletMetaResponse.newBuilder() + .setStatus(org.apache.doris.proto.Types.PStatus.newBuilder() + .setStatusCode(TStatusCode.CANCELLED.getValue())) + .setFailedTablets(2) + .build())); + + try (MockedStatic envMock = Mockito.mockStatic(Env.class); + MockedStatic backendProxyMock = Mockito.mockStatic(BackendServiceProxy.class)) { + envMock.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService); + backendProxyMock.when(BackendServiceProxy::getInstance).thenReturn(backendServiceProxy); + + handler.notifyBackendsToSyncTabletMeta(Arrays.asList(101L, 102L)); + } + + Mockito.verify(backendServiceProxy, Mockito.times(2)) + .syncTabletMeta(Mockito.any(), Mockito.any()); + } + + @Test + public void testNotifyBackendsToSyncTabletMetaReturnsWhenBackendDiscoveryFails() throws Exception { + CloudSchemaChangeHandler handler = new CloudSchemaChangeHandler(); + SystemInfoService systemInfoService = Mockito.mock(SystemInfoService.class); + Mockito.when(systemInfoService.getAllBackendsByAllCluster()) + .thenThrow(new org.apache.doris.common.AnalysisException("backend discovery failed")); + + BackendServiceProxy backendServiceProxy = Mockito.mock(BackendServiceProxy.class); + + try (MockedStatic envMock = Mockito.mockStatic(Env.class); + MockedStatic backendProxyMock = Mockito.mockStatic(BackendServiceProxy.class)) { + envMock.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService); + backendProxyMock.when(BackendServiceProxy::getInstance).thenReturn(backendServiceProxy); + + handler.notifyBackendsToSyncTabletMeta(Arrays.asList(101L, 102L)); + } + + Mockito.verifyNoInteractions(backendServiceProxy); + } + + private Cloud.UpdateTabletResponse okUpdateTabletResponse() { + return Cloud.UpdateTabletResponse.newBuilder() + .setStatus(Cloud.MetaServiceResponseStatus.newBuilder().setCode(Cloud.MetaServiceCode.OK)) + .build(); + } + + private InternalService.PSyncTabletMetaResponse okSyncTabletMetaResponse() { + return InternalService.PSyncTabletMetaResponse.newBuilder() + .setStatus(org.apache.doris.proto.Types.PStatus.newBuilder() + .setStatusCode(TStatusCode.OK.getValue())) + .setSyncedTablets(1) + .build(); + } +} diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 808a70e327a9ba..66d2eb1b5aae04 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -1071,6 +1071,17 @@ message PAlterVaultSyncRequest { message PAlterVaultSyncResponse { } +message PSyncTabletMetaRequest { + repeated int64 tablet_ids = 1; +} + +message PSyncTabletMetaResponse { + optional PStatus status = 1; + optional int64 synced_tablets = 2; + optional int64 skipped_tablets = 3; + optional int64 failed_tablets = 4; +} + message PGetBeResourceRequest { } @@ -1223,6 +1234,7 @@ service PBackendService { rpc fetch_remote_tablet_schema(PFetchRemoteSchemaRequest) returns (PFetchRemoteSchemaResponse); rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns (PJdbcTestConnectionResult); rpc alter_vault_sync(PAlterVaultSyncRequest) returns (PAlterVaultSyncResponse); + rpc sync_tablet_meta(PSyncTabletMetaRequest) returns (PSyncTabletMetaResponse); rpc get_be_resource(PGetBeResourceRequest) returns (PGetBeResourceResponse); rpc delete_dictionary(PDeleteDictionaryRequest) returns (PDeleteDictionaryResponse); rpc commit_refresh_dictionary(PCommitRefreshDictionaryRequest) returns (PCommitRefreshDictionaryResponse); @@ -1231,4 +1243,3 @@ service PBackendService { rpc fetch_peer_data(PFetchPeerDataRequest) returns (PFetchPeerDataResponse); rpc request_cdc_client(PRequestCdcClientRequest) returns (PRequestCdcClientResult); }; - diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_sync_tablet_meta_after_alter.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_sync_tablet_meta_after_alter.groovy new file mode 100644 index 00000000000000..17bdeae2d12655 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_sync_tablet_meta_after_alter.groovy @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_sync_tablet_meta_after_alter", "nonConcurrent") { + if (!isCloudMode()) { + return + } + if (!getFeConfig("enable_debug_points").equalsIgnoreCase("true")) { + logger.info("enable_debug_points=false, skip") + return + } + if (context.config.multiClusterBes == null || context.config.multiClusterBes.trim().isEmpty()) { + logger.info("multiClusterBes is empty, skip") + return + } + + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + def cluster0 = "sync_meta_cluster0" + def cluster1 = "sync_meta_cluster1" + def clusterId0 = "sync_meta_cluster_id0" + def clusterId1 = "sync_meta_cluster_id1" + def dbName = "reg_sync_tablet_meta_db" + def tableName = "test_cloud_sync_tablet_meta_after_alter" + def versionLimitTableName = "test_cloud_sync_tablet_meta_version_limit_after_alter" + + List ipList = [] + List hbPortList = [] + List beUniqueIdList = [] + + String[] bes = context.config.multiClusterBes.split(',') + for (String values : bes) { + if (ipList.size() == 2) { + break + } + String[] beInfo = values.split(':') + ipList.add(beInfo[0]) + hbPortList.add(beInfo[1]) + beUniqueIdList.add(beInfo[3]) + } + assertTrue(ipList.size() >= 2, "need at least two backends in multiClusterBes") + + def getBackendForCluster = { String clusterName -> + def backend = sql_return_maparray("show backends").find { it.Tag.contains(clusterName) } + assert backend != null: "backend for cluster ${clusterName} not found" + return backend + } + + def getBvar = { backend, String name -> + def url = "http://${backend.Host}:${backend.BrpcPort}/vars/${name}" + def (code, out, err) = curl("GET", url) + assertEquals(0, code) + def matcher = (out =~ /(?m)^${java.util.regex.Pattern.quote(name)}\\s*:\\s*(\\d+)$/) + assertTrue(matcher.find(), "failed to parse bvar ${name} from ${url}, out=${out}, err=${err}") + return matcher.group(1).toLong() + } + + def waitForBvarIncrease = { backend, String name, long baseline, long delta -> + for (int retry = 0; retry < 20; retry++) { + def current = getBvar(backend, name) + logger.info("wait bvar ${name} on ${backend.Host}:${backend.BrpcPort}, baseline=${baseline}, current=${current}") + if (current >= baseline + delta) { + return current + } + sleep(500) + } + return getBvar(backend, name) + } + + def getCompactionPolicy = { String currentTableName -> + def tablets = sql_return_maparray("""show tablets from ${currentTableName};""") + assertEquals(1, tablets.size()) + def (code, out, err) = curl("GET", tablets[0].CompactionStatus) + assertEquals(0, code) + def tabletJson = parseJson(out.trim()) + return tabletJson["compaction policy"] + } + + def useCluster = { String clusterName -> + sql """use @${clusterName}""" + sql """use ${dbName}""" + } + + def cleanupClusters = { + for (uniqueId in beUniqueIdList) { + def resp = get_cluster.call(uniqueId) + for (cluster in resp) { + if (cluster.type == "COMPUTE") { + drop_cluster.call(cluster.cluster_name, cluster.cluster_id) + } + } + } + wait_cluster_change() + } + + setBeConfigTemporary([ + tablet_sync_interval_s: 18000, + schedule_sync_tablets_interval_s: 18000 + ]) { + try { + cleanupClusters.call() + add_cluster.call(beUniqueIdList[0], ipList[0], hbPortList[0], cluster0, clusterId0) + add_cluster.call(beUniqueIdList[1], ipList[1], hbPortList[1], cluster1, clusterId1) + wait_cluster_change() + + sql """use @${cluster0}""" + sql """create database if not exists ${dbName}""" + sql """use ${dbName}""" + sql """drop table if exists ${tableName} force""" + sql """drop table if exists ${versionLimitTableName} force""" + sql """ + CREATE TABLE ${tableName} ( + id INT NOT NULL, + value INT + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ) + """ + sql """insert into ${tableName} values (1, 10), (2, 20), (3, 30)""" + + def be0 = getBackendForCluster(cluster0) + def be1 = getBackendForCluster(cluster1) + + // Case 1: warm only cluster0, alter property, and verify cluster1 is notified but skipped. + useCluster.call(cluster0) + qt_case1_cluster0_warmup """select count(*) from ${tableName}""" + assertEquals("size_based", getCompactionPolicy(tableName)) + + def case1Synced0 = getBvar(be0, "cloud_sync_tablet_meta_synced_total") + def case1Skipped1 = getBvar(be1, "cloud_sync_tablet_meta_skipped_total") + + sql """alter table ${tableName} set ("compaction_policy" = "time_series")""" + + assertTrue(waitForBvarIncrease(be0, "cloud_sync_tablet_meta_synced_total", case1Synced0, 1) >= case1Synced0 + 1) + assertTrue(waitForBvarIncrease(be1, "cloud_sync_tablet_meta_skipped_total", case1Skipped1, 1) >= case1Skipped1 + 1) + + useCluster.call(cluster0) + qt_case1_cluster0_after_alter """select count(*) from ${tableName}""" + assertEquals("time_series", getCompactionPolicy(tableName)) + + useCluster.call(cluster1) + qt_case1_cluster1_first_access """select count(*) from ${tableName}""" + assertEquals("time_series", getCompactionPolicy(tableName)) + + // Case 2: warm both clusters, alter property again, and verify both clusters sync immediately. + useCluster.call(cluster0) + qt_case2_cluster0_warmup """select count(*) from ${tableName}""" + useCluster.call(cluster1) + qt_case2_cluster1_warmup """select count(*) from ${tableName}""" + + def case2Synced0 = getBvar(be0, "cloud_sync_tablet_meta_synced_total") + def case2Synced1 = getBvar(be1, "cloud_sync_tablet_meta_synced_total") + + useCluster.call(cluster0) + sql """alter table ${tableName} set ("compaction_policy" = "size_based")""" + + assertTrue(waitForBvarIncrease(be0, "cloud_sync_tablet_meta_synced_total", case2Synced0, 1) >= case2Synced0 + 1) + assertTrue(waitForBvarIncrease(be1, "cloud_sync_tablet_meta_synced_total", case2Synced1, 1) >= case2Synced1 + 1) + + useCluster.call(cluster0) + assertEquals("size_based", getCompactionPolicy(tableName)) + useCluster.call(cluster1) + assertEquals("size_based", getCompactionPolicy(tableName)) + + // Case 3: hit the size_based version limit first, then alter to time_series and verify writes succeed immediately. + useCluster.call(cluster0) + sql """drop table if exists ${versionLimitTableName} force""" + sql """ + CREATE TABLE ${versionLimitTableName} ( + id BIGINT NOT NULL, + value BIGINT + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ) + """ + qt_case3_version_limit_warmup """select count(*) from ${versionLimitTableName}""" + assertEquals("size_based", getCompactionPolicy(versionLimitTableName)) + + int successfulBeforeAlter = 0 + String versionLimitFailure = null + for (int i = 0; i < 2400; i++) { + try { + sql """insert into ${versionLimitTableName} values (${i}, ${i})""" + successfulBeforeAlter++ + if ((i + 1) % 200 == 0) { + logger.info("inserted {} versions into {}", i + 1, versionLimitTableName) + } + } catch (Exception e) { + versionLimitFailure = e.getMessage() + logger.info("hit version limit after {} successful inserts, msg={}", + successfulBeforeAlter, versionLimitFailure) + break + } + } + assertTrue(versionLimitFailure != null, "expected version limit failure before alter") + assertTrue(versionLimitFailure.contains("-235") + || versionLimitFailure.toLowerCase().contains("too many versions") + || versionLimitFailure.toLowerCase().contains("version count") + || versionLimitFailure.toLowerCase().contains("exceed limit"), + "unexpected failure message: ${versionLimitFailure}") + + def versionLimitTablets = sql_return_maparray("""show tablets from ${versionLimitTableName};""") + assertEquals(1, versionLimitTablets.size()) + long versionCountBeforeAlter = versionLimitTablets[0].VersionCount.toLong() + assertTrue(versionCountBeforeAlter >= 2000, + "expected version count near max limit, actual=${versionCountBeforeAlter}") + + sql """alter table ${versionLimitTableName} set ("compaction_policy" = "time_series")""" + assertEquals("time_series", getCompactionPolicy(versionLimitTableName)) + + int successfulAfterAlter = 0 + for (int i = 0; i < 50; i++) { + long rowId = 100000L + i + sql """insert into ${versionLimitTableName} values (${rowId}, ${rowId})""" + successfulAfterAlter++ + } + assertEquals(50, successfulAfterAlter) + def versionLimitTabletsAfterAlter = sql_return_maparray("""show tablets from ${versionLimitTableName};""") + assertEquals(1, versionLimitTabletsAfterAlter.size()) + assertTrue(versionLimitTabletsAfterAlter[0].VersionCount.toLong() > versionCountBeforeAlter, + "expected version count to keep increasing immediately after alter") + qt_case3_version_limit_after_alter """select count(*) from ${versionLimitTableName}""" + + // Case 4: disable FE proactive notification and verify cached tablets remain stale. + GetDebugPoint().enableDebugPointForAllFEs("CloudSchemaChangeHandler.notifyBackendsToSyncTabletMeta.skip") + def case4Request0 = getBvar(be0, "cloud_sync_tablet_meta_requests_total") + def case4Request1 = getBvar(be1, "cloud_sync_tablet_meta_requests_total") + def case4Synced0 = getBvar(be0, "cloud_sync_tablet_meta_synced_total") + def case4Synced1 = getBvar(be1, "cloud_sync_tablet_meta_synced_total") + def case4Skipped0 = getBvar(be0, "cloud_sync_tablet_meta_skipped_total") + def case4Skipped1 = getBvar(be1, "cloud_sync_tablet_meta_skipped_total") + + useCluster.call(cluster0) + sql """alter table ${tableName} set ("compaction_policy" = "time_series")""" + sleep(2000) + + assertEquals(case4Request0, getBvar(be0, "cloud_sync_tablet_meta_requests_total")) + assertEquals(case4Request1, getBvar(be1, "cloud_sync_tablet_meta_requests_total")) + assertEquals(case4Synced0, getBvar(be0, "cloud_sync_tablet_meta_synced_total")) + assertEquals(case4Synced1, getBvar(be1, "cloud_sync_tablet_meta_synced_total")) + assertEquals(case4Skipped0, getBvar(be0, "cloud_sync_tablet_meta_skipped_total")) + assertEquals(case4Skipped1, getBvar(be1, "cloud_sync_tablet_meta_skipped_total")) + + useCluster.call(cluster0) + assertEquals("size_based", getCompactionPolicy(tableName)) + useCluster.call(cluster1) + assertEquals("size_based", getCompactionPolicy(tableName)) + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + try { + sql """use @${cluster0}""" + sql """drop table if exists ${dbName}.${tableName} force""" + sql """drop table if exists ${dbName}.${versionLimitTableName} force""" + } catch (Exception e) { + logger.info("drop table in finally failed: ${e.getMessage()}") + } + cleanupClusters.call() + } + } +}