diff --git a/be/src/cloud/cloud_internal_service.cpp b/be/src/cloud/cloud_internal_service.cpp index 99b08a9bf38564..573cbf2e96a1e2 100644 --- a/be/src/cloud/cloud_internal_service.cpp +++ b/be/src/cloud/cloud_internal_service.cpp @@ -49,12 +49,57 @@ bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency( "file_cache_get_by_peer_read_cache_file_latency"); bvar::LatencyRecorder g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency( "cloud_internal_service_get_file_cache_meta_by_tablet_id_latency"); +bvar::Adder g_cloud_sync_tablet_meta_requests_total( + "cloud_sync_tablet_meta_requests_total"); +bvar::Adder g_cloud_sync_tablet_meta_synced_total("cloud_sync_tablet_meta_synced_total"); +bvar::Adder g_cloud_sync_tablet_meta_skipped_total("cloud_sync_tablet_meta_skipped_total"); +bvar::Adder g_cloud_sync_tablet_meta_failed_total("cloud_sync_tablet_meta_failed_total"); CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env) : PInternalService(exec_env), _engine(engine) {} CloudInternalServiceImpl::~CloudInternalServiceImpl() = default; +void CloudInternalServiceImpl::sync_tablet_meta(google::protobuf::RpcController* controller, + const PSyncTabletMetaRequest* request, + PSyncTabletMetaResponse* response, + google::protobuf::Closure* done) { + bool ret = _light_work_pool.try_offer([this, request, response, done]() { + brpc::ClosureGuard closure_guard(done); + int64_t synced = 0; + int64_t skipped = 0; + int64_t failed = 0; + g_cloud_sync_tablet_meta_requests_total << 1; + for (const auto tablet_id : request->tablet_ids()) { + auto tablet = _engine.tablet_mgr().get_tablet_if_cached(tablet_id); + if (!tablet) { + ++skipped; + continue; + } + auto st = tablet->sync_meta(); + if (!st.ok()) { + ++failed; + LOG(WARNING) << "failed to sync tablet meta from cloud meta service, tablet=" + << tablet_id << ", err=" << st; + continue; + } + ++synced; + } + g_cloud_sync_tablet_meta_synced_total << synced; + g_cloud_sync_tablet_meta_skipped_total << skipped; + g_cloud_sync_tablet_meta_failed_total << failed; + response->set_synced_tablets(synced); + response->set_skipped_tablets(skipped); + response->set_failed_tablets(failed); + Status::OK().to_protobuf(response->mutable_status()); + }); + if (!ret) { + brpc::ClosureGuard closure_guard(done); + Status::InternalError("failed to offer sync_tablet_meta request to work pool") + .to_protobuf(response->mutable_status()); + } +} + void CloudInternalServiceImpl::alter_vault_sync(google::protobuf::RpcController* controller, const doris::PAlterVaultSyncRequest* request, PAlterVaultSyncResponse* response, diff --git a/be/src/cloud/cloud_internal_service.h b/be/src/cloud/cloud_internal_service.h index db4916313fe596..d239d891738b4a 100644 --- a/be/src/cloud/cloud_internal_service.h +++ b/be/src/cloud/cloud_internal_service.h @@ -34,6 +34,10 @@ class CloudInternalServiceImpl final : public PInternalService { PAlterVaultSyncResponse* response, google::protobuf::Closure* done) override; + void sync_tablet_meta(google::protobuf::RpcController* controller, + const PSyncTabletMetaRequest* request, PSyncTabletMetaResponse* response, + google::protobuf::Closure* done) override; + // Get messages (filename, offset, size) about the tablet data in cache void get_file_cache_meta_by_tablet_id(google::protobuf::RpcController* controller, const PGetFileCacheMetaRequest* request, diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp index 2567019301cff5..0eb7181c7e26a9 100644 --- a/be/src/cloud/cloud_tablet.cpp +++ b/be/src/cloud/cloud_tablet.cpp @@ -868,6 +868,12 @@ void CloudTablet::get_compaction_status(std::string* json_result) { // get snapshot version path json_doc _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr); + rapidjson::Value compaction_policy_value; + auto compaction_policy = _tablet_meta->compaction_policy(); + compaction_policy_value.SetString(compaction_policy.c_str(), + cast_set(compaction_policy.length()), + root.GetAllocator()); + root.AddMember("compaction policy", compaction_policy_value, root.GetAllocator()); root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator()); rapidjson::Value cumu_value; std::string format_str = ToStringFromUnixMillis(_last_cumu_compaction_failure_millis.load()); @@ -1376,10 +1382,6 @@ void CloudTablet::agg_delete_bitmap_for_compaction( } Status CloudTablet::sync_meta() { - if (!config::enable_file_cache) { - return Status::OK(); - } - TabletMetaSharedPtr tablet_meta; auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta); if (!st.ok()) { @@ -1393,6 +1395,10 @@ Status CloudTablet::sync_meta() { if (_tablet_meta->compaction_policy() != new_compaction_policy) { _tablet_meta->set_compaction_policy(new_compaction_policy); } + auto new_ttl_seconds = tablet_meta->ttl_seconds(); + if (_tablet_meta->ttl_seconds() != new_ttl_seconds) { + _tablet_meta->set_ttl_seconds(new_ttl_seconds); + } auto new_time_series_compaction_goal_size_mbytes = tablet_meta->time_series_compaction_goal_size_mbytes(); if (_tablet_meta->time_series_compaction_goal_size_mbytes() != diff --git a/be/src/cloud/cloud_tablet_mgr.cpp b/be/src/cloud/cloud_tablet_mgr.cpp index 3e979864138645..6d0648f9233413 100644 --- a/be/src/cloud/cloud_tablet_mgr.cpp +++ b/be/src/cloud/cloud_tablet_mgr.cpp @@ -288,6 +288,10 @@ bool CloudTabletMgr::peek_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* ta return true; } +std::shared_ptr CloudTabletMgr::get_tablet_if_cached(int64_t tablet_id) { + return _tablet_map->get(tablet_id); +} + void CloudTabletMgr::erase_tablet(int64_t tablet_id) { auto tablet_id_str = std::to_string(tablet_id); CacheKey key(tablet_id_str.data(), tablet_id_str.size()); diff --git a/be/src/cloud/cloud_tablet_mgr.h b/be/src/cloud/cloud_tablet_mgr.h index 9894d97552b872..b9b6d06e12efb5 100644 --- a/be/src/cloud/cloud_tablet_mgr.h +++ b/be/src/cloud/cloud_tablet_mgr.h @@ -62,6 +62,8 @@ class CloudTabletMgr { // Return true if cached tablet meta is found (without triggering RPC) and filled. bool peek_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta); + std::shared_ptr get_tablet_if_cached(int64_t tablet_id); + void erase_tablet(int64_t tablet_id); void vacuum_stale_rowsets(const CountDownLatch& stop_latch); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 2f2250f265db90..e46526fe0cea90 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -2530,5 +2530,14 @@ void PInternalService::request_cdc_client(google::protobuf::RpcController* contr } } +void PInternalService::sync_tablet_meta(google::protobuf::RpcController* controller, + const PSyncTabletMetaRequest* request, + PSyncTabletMetaResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard closure_guard(done); + Status::NotSupported("sync_tablet_meta only supports cloud mode") + .to_protobuf(response->mutable_status()); +} + #include "common/compile_check_avoid_end.h" } // namespace doris diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index b1e11ec34b73c4..3042d2cae3a105 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -223,6 +223,10 @@ class PInternalService : public PBackendService { const PGetBeResourceRequest* request, PGetBeResourceResponse* response, google::protobuf::Closure* done) override; + void sync_tablet_meta(google::protobuf::RpcController* controller, + const PSyncTabletMetaRequest* request, PSyncTabletMetaResponse* response, + google::protobuf::Closure* done) override; + void delete_dictionary(google::protobuf::RpcController* controller, const PDeleteDictionaryRequest* request, PDeleteDictionaryResponse* response, diff --git a/be/test/cloud/cloud_internal_service_test.cpp b/be/test/cloud/cloud_internal_service_test.cpp new file mode 100644 index 00000000000000..fb2610c3b96245 --- /dev/null +++ b/be/test/cloud/cloud_internal_service_test.cpp @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_internal_service.h" + +#include +#include +#include +#include +#include + +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet_mgr.h" +#include "cpp/sync_point.h" +#include "gen_cpp/Status_types.h" +#include "storage/tablet/tablet_meta.h" +#include "util/uid_util.h" + +namespace doris { + +namespace { + +class TestClosure final : public google::protobuf::Closure { +public: + explicit TestClosure(std::function callback) : _callback(std::move(callback)) {} + + void Run() override { + if (_callback) { + _callback(); + } + } + +private: + std::function _callback; +}; + +} // namespace + +class CloudInternalServiceTest : public testing::Test { +public: + CloudInternalServiceTest() : _engine(CloudStorageEngine(EngineOptions())) {} + +protected: + static constexpr int64_t kSyncedTabletId = 90001; + static constexpr int64_t kFailedTabletId = 90002; + static constexpr int64_t kUncachedTabletId = 90003; + + void SetUp() override { + auto sp = SyncPoint::get_instance(); + sp->clear_all_call_backs(); + sp->enable_processing(); + + sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", + [](auto&& args) { try_any_cast_ret(args)->second = true; }); + sp->set_call_back("CloudMetaMgr::get_tablet_meta", [this](auto&& args) { + const auto tablet_id = try_any_cast(args[0]); + auto* tablet_meta_ptr = try_any_cast(args[1]); + auto* ret = try_any_cast_ret(args); + + auto& call_count = _tablet_meta_call_count[tablet_id]; + ++call_count; + if (tablet_id == kFailedTabletId && call_count >= 2) { + ret->first = Status::InternalError("injected sync_meta failure"); + ret->second = true; + return; + } + + *tablet_meta_ptr = + create_tablet_meta(tablet_id, call_count >= 2 ? "time_series" : "size_based"); + ret->second = true; + }); + } + + void TearDown() override { + auto sp = SyncPoint::get_instance(); + sp->disable_processing(); + sp->clear_all_call_backs(); + } + + TabletMetaSharedPtr create_tablet_meta(int64_t tablet_id, std::string compaction_policy) { + TTabletSchema tablet_schema; + TColumn column; + column.__set_column_name("c1"); + column.__set_column_type(TColumnType()); + column.column_type.__set_type(TPrimitiveType::INT); + column.__set_is_key(true); + column.__set_aggregation_type(TAggregationType::NONE); + column.__set_col_unique_id(0); + tablet_schema.__set_columns({column}); + tablet_schema.__set_keys_type(TKeysType::DUP_KEYS); + + std::unordered_map col_ordinal_to_unique_id = {{0, 0}}; + auto tablet_meta = std::make_shared( + 1, 2, tablet_id, 15674, 4, 5, tablet_schema, 1, col_ordinal_to_unique_id, + UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F); + tablet_meta->set_compaction_policy(std::move(compaction_policy)); + return tablet_meta; + } + + CloudStorageEngine _engine; + std::unordered_map _tablet_meta_call_count; +}; + +TEST_F(CloudInternalServiceTest, TestSyncTabletMetaCountsSyncedSkippedAndFailedTablets) { + auto synced_res = _engine.tablet_mgr().get_tablet(kSyncedTabletId); + ASSERT_TRUE(synced_res.has_value()) << synced_res.error(); + auto failed_res = _engine.tablet_mgr().get_tablet(kFailedTabletId); + ASSERT_TRUE(failed_res.has_value()) << failed_res.error(); + + auto cached_synced_tablet = synced_res.value(); + auto cached_failed_tablet = failed_res.value(); + ASSERT_EQ("size_based", cached_synced_tablet->tablet_meta()->compaction_policy()); + ASSERT_EQ("size_based", cached_failed_tablet->tablet_meta()->compaction_policy()); + + CloudInternalServiceImpl service(_engine, nullptr); + PSyncTabletMetaRequest request; + request.add_tablet_ids(kSyncedTabletId); + request.add_tablet_ids(kUncachedTabletId); + request.add_tablet_ids(kFailedTabletId); + PSyncTabletMetaResponse response; + + std::mutex mutex; + std::condition_variable cv; + bool done = false; + TestClosure closure([&]() { + std::lock_guard lock(mutex); + done = true; + cv.notify_all(); + }); + + service.sync_tablet_meta(nullptr, &request, &response, &closure); + + std::unique_lock lock(mutex); + ASSERT_TRUE(cv.wait_for(lock, std::chrono::seconds(5), [&] { return done; })); + + ASSERT_TRUE(response.has_status()); + EXPECT_EQ(TStatusCode::OK, response.status().status_code()) << response.status().DebugString(); + EXPECT_EQ(1, response.synced_tablets()); + EXPECT_EQ(1, response.skipped_tablets()); + EXPECT_EQ(1, response.failed_tablets()); + EXPECT_EQ("time_series", cached_synced_tablet->tablet_meta()->compaction_policy()); + EXPECT_EQ("size_based", cached_failed_tablet->tablet_meta()->compaction_policy()); + EXPECT_EQ(nullptr, _engine.tablet_mgr().get_tablet_if_cached(kUncachedTabletId)); +} + +} // namespace doris diff --git a/be/test/cloud/cloud_tablet_mgr_test.cpp b/be/test/cloud/cloud_tablet_mgr_test.cpp index 3c9e0b26eb1059..c893b72a475c7f 100644 --- a/be/test/cloud/cloud_tablet_mgr_test.cpp +++ b/be/test/cloud/cloud_tablet_mgr_test.cpp @@ -178,4 +178,36 @@ TEST_F(CloudTabletMgrTest, TestConcurrentGetTabletTabletMapConsistency) { sp->clear_all_call_backs(); } +TEST_F(CloudTabletMgrTest, TestGetTabletIfCachedOnlyReturnsCachedTablet) { + auto sp = SyncPoint::get_instance(); + sp->clear_all_call_backs(); + sp->enable_processing(); + + sp->set_call_back("CloudMetaMgr::get_tablet_meta", [this](auto&& args) { + auto* tablet_meta_ptr = try_any_cast(args[1]); + *tablet_meta_ptr = _tablet_meta; + try_any_cast_ret(args)->second = true; + }); + sp->set_call_back("CloudMetaMgr::sync_tablet_rowsets", + [](auto&& args) { try_any_cast_ret(args)->second = true; }); + + CloudTabletMgr mgr(_engine); + const int64_t cached_tablet_id = 99999; + const int64_t uncached_tablet_id = 100000; + + EXPECT_EQ(nullptr, mgr.get_tablet_if_cached(cached_tablet_id)); + + auto res = mgr.get_tablet(cached_tablet_id); + ASSERT_TRUE(res.has_value()) << res.error(); + + auto cached_tablet = mgr.get_tablet_if_cached(cached_tablet_id); + ASSERT_NE(nullptr, cached_tablet); + EXPECT_EQ(cached_tablet_id, cached_tablet->tablet_id()); + + EXPECT_EQ(nullptr, mgr.get_tablet_if_cached(uncached_tablet_id)); + + sp->disable_processing(); + sp->clear_all_call_backs(); +} + } // namespace doris diff --git a/be/test/cloud/cloud_tablet_test.cpp b/be/test/cloud/cloud_tablet_test.cpp index 66a07cbc29606a..336d0420769adf 100644 --- a/be/test/cloud/cloud_tablet_test.cpp +++ b/be/test/cloud/cloud_tablet_test.cpp @@ -721,6 +721,7 @@ class CloudTabletSyncMetaTest : public testing::Test { const std::string& compaction_policy = "size_based") { TTabletSchema new_schema; new_schema.__set_disable_auto_compaction(disable_auto_compaction); + new_schema.__set_is_in_memory(false); TColumn col; col.__set_column_name("test_col_" + std::to_string(_current_tablet_id)); col.__set_column_type(TColumnType()); @@ -878,8 +879,8 @@ TEST_F(CloudTabletSyncMetaTest, TestSyncMetaDisableAutoCompactionUnchanged) { sp->clear_all_call_backs(); } -// Test sync_meta is skipped when enable_file_cache is false -TEST_F(CloudTabletSyncMetaTest, TestSyncMetaSkippedWhenFileCacheDisabled) { +// sync_meta should keep refreshing tablet meta even when file cache is disabled. +TEST_F(CloudTabletSyncMetaTest, TestSyncMetaWhenFileCacheDisabled) { // Disable file cache config::enable_file_cache = false; @@ -890,17 +891,22 @@ TEST_F(CloudTabletSyncMetaTest, TestSyncMetaSkippedWhenFileCacheDisabled) { sp->clear_all_call_backs(); sp->enable_processing(); + auto mock_tablet_meta = createMockTabletMeta(true, "time_series"); bool callback_called = false; sp->set_call_back("CloudMetaMgr::get_tablet_meta", - [&callback_called](auto&& args) { callback_called = true; }); + [mock_tablet_meta, &callback_called](auto&& args) { + callback_called = true; + auto* tablet_meta_ptr = try_any_cast(args[1]); + *tablet_meta_ptr = mock_tablet_meta; + try_any_cast_ret(args)->second = true; + }); - // Call sync_meta - should return early without calling get_tablet_meta Status st = _tablet->sync_meta(); EXPECT_TRUE(st.ok()); - EXPECT_FALSE(callback_called); + EXPECT_TRUE(callback_called); - // Verify disable_auto_compaction is not changed - EXPECT_FALSE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()); + EXPECT_TRUE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()); + EXPECT_EQ(_tablet->tablet_meta()->compaction_policy(), "time_series"); sp->disable_processing(); sp->clear_all_call_backs(); @@ -940,6 +946,37 @@ TEST_F(CloudTabletSyncMetaTest, TestSyncMetaMultipleProperties) { sp->disable_processing(); sp->clear_all_call_backs(); } + +TEST_F(CloudTabletSyncMetaTest, TestSyncMetaSyncsTtlAndInMemory) { + EXPECT_EQ(0, _tablet->tablet_meta()->ttl_seconds()); + EXPECT_FALSE(_tablet->tablet_meta()->tablet_schema()->is_in_memory()); + + auto sp = SyncPoint::get_instance(); + sp->clear_all_call_backs(); + sp->enable_processing(); + + auto mock_tablet_meta = createMockTabletMeta(true, "time_series"); + mock_tablet_meta->set_ttl_seconds(3600); + mock_tablet_meta->mutable_tablet_schema()->set_is_in_memory(true); + + sp->set_call_back("CloudMetaMgr::get_tablet_meta", [mock_tablet_meta](auto&& args) { + auto* tablet_meta_ptr = try_any_cast(args[1]); + *tablet_meta_ptr = mock_tablet_meta; + try_any_cast_ret(args)->second = true; + }); + + Status st = _tablet->sync_meta(); + EXPECT_TRUE(st.ok()); + + EXPECT_EQ(3600, _tablet->tablet_meta()->ttl_seconds()); + EXPECT_TRUE(_tablet->tablet_meta()->tablet_schema()->is_in_memory()); + EXPECT_TRUE(_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()); + EXPECT_EQ(_tablet->tablet_meta()->compaction_policy(), "time_series"); + + sp->disable_processing(); + sp->clear_all_call_backs(); +} + class CloudTabletApplyVisiblePendingTest : public testing::Test { public: CloudTabletApplyVisiblePendingTest() : _engine(CloudStorageEngine(EngineOptions {})) {} diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java index c2155963273e09..b6d6a6d0337a4d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java @@ -34,12 +34,21 @@ import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DynamicPartitionUtil; import org.apache.doris.common.util.PropertyAnalyzer; +import org.apache.doris.proto.InternalService; +import org.apache.doris.rpc.BackendServiceProxy; import org.apache.doris.service.FrontendOptions; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -540,6 +549,60 @@ public void updateCloudPartitionMeta(Database db, if (response.getStatus().getCode() != Cloud.MetaServiceCode.OK) { throw new UserException(response.getStatus().getMsg()); } + + notifyBackendsToSyncTabletMeta(updateTabletReq.getTabletMetaInfosList().stream() + .map(Cloud.TabletMetaInfoPB::getTabletId) + .collect(Collectors.toList())); + } + } + + void notifyBackendsToSyncTabletMeta(List tabletIds) { + if (tabletIds.isEmpty()) { + return; + } + if (DebugPointUtil.isEnable("CloudSchemaChangeHandler.notifyBackendsToSyncTabletMeta.skip")) { + LOG.info("skip sync tablet meta rpc dispatch by debug point, tabletIds={}", tabletIds); + return; + } + List backends; + try { + backends = Env.getCurrentSystemInfo().getAllBackendsByAllCluster().values().asList(); + } catch (UserException e) { + LOG.warn("failed to get alive backends for sync tablet meta, tabletIds={}", tabletIds, e); + return; + } + + InternalService.PSyncTabletMetaRequest request = InternalService.PSyncTabletMetaRequest.newBuilder() + .addAllTabletIds(tabletIds) + .build(); + for (Backend backend : backends) { + if (!backend.isAlive() || backend.getBrpcPort() <= 0) { + continue; + } + try { + ListenableFuture future = + BackendServiceProxy.getInstance().syncTabletMeta(backend.getBrpcAddress(), request); + Futures.addCallback(future, new FutureCallback() { + @Override + public void onSuccess(InternalService.PSyncTabletMetaResponse response) { + if (response == null || !response.hasStatus() + || response.getStatus().getStatusCode() != TStatusCode.OK.getValue()) { + LOG.warn("sync tablet meta rpc returned non-ok response, backendId={}, tabletIds={}," + + " response={}", + backend.getId(), tabletIds, response); + } + } + + @Override + public void onFailure(Throwable t) { + LOG.warn("sync tablet meta rpc failed, backendId={}, tabletIds={}", + backend.getId(), tabletIds, t); + } + }, MoreExecutors.directExecutor()); + } catch (Exception e) { + LOG.warn("failed to dispatch sync tablet meta rpc, backendId={}, tabletIds={}", + backend.getId(), tabletIds, e); + } } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index fc3dac0c214764..87ffc9ae64094b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -192,6 +192,11 @@ public Future alterVaultSync( return stub.alterVaultSync(request); } + public ListenableFuture syncTabletMeta( + InternalService.PSyncTabletMetaRequest request) { + return stub.syncTabletMeta(request); + } + public Future getBeResource(InternalService.PGetBeResourceRequest request, int timeoutSec) { return stub.withDeadlineAfter(timeoutSec, TimeUnit.SECONDS).getBeResource(request); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index fbdfb3cf223a70..d72514e0afa6b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -541,6 +541,18 @@ public Future alterVaultSync(TNetworkAddress address, } } + public ListenableFuture syncTabletMeta(TNetworkAddress address, + InternalService.PSyncTabletMetaRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.syncTabletMeta(request); + } catch (Throwable e) { + LOG.warn("failed to sync tablet meta from address={}:{}", address.getHostname(), + address.getPort(), e); + throw new RpcException(address.getHostname(), e.getMessage()); + } + } + public Future fetchRemoteTabletSchemaAsync( TNetworkAddress address, InternalService.PFetchRemoteSchemaRequest request) throws RpcException { try { diff --git a/fe/fe-core/src/test/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandlerTest.java new file mode 100644 index 00000000000000..e2ad88a502ada6 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandlerTest.java @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cloud.alter; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.cloud.proto.Cloud; +import org.apache.doris.cloud.rpc.MetaServiceProxy; +import org.apache.doris.common.Config; +import org.apache.doris.proto.InternalService; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.system.Backend; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Futures; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.MockedStatic; +import org.mockito.Mockito; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class CloudSchemaChangeHandlerTest { + private int originalCloudTxnTabletBatchSize; + private String originalCloudUniqueId; + private String originalMetaServiceEndpoint; + + @Before + public void setUp() { + originalCloudTxnTabletBatchSize = Config.cloud_txn_tablet_batch_size; + originalCloudUniqueId = Config.cloud_unique_id; + originalMetaServiceEndpoint = Config.meta_service_endpoint; + Config.cloud_txn_tablet_batch_size = 2; + Config.cloud_unique_id = "cloud-test"; + Config.meta_service_endpoint = "127.0.0.1:20121"; + } + + @After + public void tearDown() { + Config.cloud_txn_tablet_batch_size = originalCloudTxnTabletBatchSize; + Config.cloud_unique_id = originalCloudUniqueId; + Config.meta_service_endpoint = originalMetaServiceEndpoint; + } + + @Test + public void testUpdateTablePropertiesNotifiesAllAliveBackendsInBatches() throws Exception { + CloudSchemaChangeHandler handler = new CloudSchemaChangeHandler(); + Database db = Mockito.mock(Database.class); + OlapTable table = Mockito.mock(OlapTable.class); + Partition partition = Mockito.mock(Partition.class); + MaterializedIndex index = Mockito.mock(MaterializedIndex.class); + org.apache.doris.catalog.Tablet tablet1 = Mockito.mock(org.apache.doris.catalog.Tablet.class); + org.apache.doris.catalog.Tablet tablet2 = Mockito.mock(org.apache.doris.catalog.Tablet.class); + org.apache.doris.catalog.Tablet tablet3 = Mockito.mock(org.apache.doris.catalog.Tablet.class); + + Mockito.when(db.getTableOrMetaException("tbl", org.apache.doris.catalog.Table.TableType.OLAP)) + .thenReturn(table); + Mockito.when(table.getName()).thenReturn("tbl"); + Mockito.when(table.getCompactionPolicy()).thenReturn("size_based"); + Mockito.when(table.getKeysType()).thenReturn(org.apache.doris.catalog.KeysType.DUP_KEYS); + Mockito.when(table.getPartitions()).thenReturn(Arrays.asList(partition)); + Mockito.when(table.getPartition("p1")).thenReturn(partition); + Mockito.when(partition.getName()).thenReturn("p1"); + Mockito.when(partition.getMaterializedIndices(MaterializedIndex.IndexExtState.VISIBLE)) + .thenReturn(Arrays.asList(index)); + Mockito.when(index.getTablets()).thenReturn(Arrays.asList(tablet1, tablet2, tablet3)); + Mockito.when(tablet1.getId()).thenReturn(101L); + Mockito.when(tablet2.getId()).thenReturn(102L); + Mockito.when(tablet3.getId()).thenReturn(103L); + + Backend aliveBackend1 = new Backend(1L, "be-1", 9050); + aliveBackend1.setAlive(true); + aliveBackend1.setBrpcPort(8060); + Backend aliveBackend2 = new Backend(2L, "be-2", 9050); + aliveBackend2.setAlive(true); + aliveBackend2.setBrpcPort(8061); + Backend deadBackend = new Backend(3L, "be-dead", 9050); + deadBackend.setAlive(false); + deadBackend.setBrpcPort(8062); + + SystemInfoService systemInfoService = Mockito.mock(SystemInfoService.class); + Mockito.when(systemInfoService.getAllBackendsByAllCluster()).thenReturn(ImmutableMap.of( + aliveBackend1.getId(), aliveBackend1, + aliveBackend2.getId(), aliveBackend2, + deadBackend.getId(), deadBackend)); + + Env env = Mockito.mock(Env.class); + + MetaServiceProxy metaServiceProxy = Mockito.mock(MetaServiceProxy.class); + Mockito.when(metaServiceProxy.updateTablet(Mockito.any())).thenReturn(okUpdateTabletResponse()); + + BackendServiceProxy backendServiceProxy = Mockito.mock(BackendServiceProxy.class); + Mockito.when(backendServiceProxy.syncTabletMeta(Mockito.any(), Mockito.any())) + .thenReturn(Futures.immediateFuture(okSyncTabletMetaResponse())); + + try (MockedStatic envMock = Mockito.mockStatic(Env.class); + MockedStatic metaProxyMock = Mockito.mockStatic(MetaServiceProxy.class); + MockedStatic backendProxyMock = Mockito.mockStatic(BackendServiceProxy.class)) { + envMock.when(Env::getCurrentEnv).thenReturn(env); + envMock.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService); + metaProxyMock.when(MetaServiceProxy::getInstance).thenReturn(metaServiceProxy); + backendProxyMock.when(BackendServiceProxy::getInstance).thenReturn(backendServiceProxy); + + Map properties = new HashMap<>(); + properties.put("compaction_policy", "time_series"); + handler.updateTableProperties(db, "tbl", properties); + } + + ArgumentCaptor updateCaptor = + ArgumentCaptor.forClass(Cloud.UpdateTabletRequest.class); + Mockito.verify(metaServiceProxy, Mockito.times(2)).updateTablet(updateCaptor.capture()); + List updateRequests = updateCaptor.getAllValues(); + Assert.assertEquals(Arrays.asList(101L, 102L), + updateRequests.get(0).getTabletMetaInfosList().stream() + .map(Cloud.TabletMetaInfoPB::getTabletId).collect(Collectors.toList())); + Assert.assertEquals(Arrays.asList(103L), + updateRequests.get(1).getTabletMetaInfosList().stream() + .map(Cloud.TabletMetaInfoPB::getTabletId).collect(Collectors.toList())); + + ArgumentCaptor addressCaptor = ArgumentCaptor.forClass(TNetworkAddress.class); + ArgumentCaptor syncCaptor = + ArgumentCaptor.forClass(InternalService.PSyncTabletMetaRequest.class); + Mockito.verify(backendServiceProxy, Mockito.times(4)) + .syncTabletMeta(addressCaptor.capture(), syncCaptor.capture()); + + List addresses = addressCaptor.getAllValues(); + Assert.assertFalse(addresses.stream().anyMatch(addr -> "be-dead".equals(addr.getHostname()))); + Assert.assertEquals(2L, addresses.stream().filter(addr -> "be-1".equals(addr.getHostname())).count()); + Assert.assertEquals(2L, addresses.stream().filter(addr -> "be-2".equals(addr.getHostname())).count()); + + List syncRequests = syncCaptor.getAllValues(); + Assert.assertEquals(Arrays.asList(101L, 102L), syncRequests.get(0).getTabletIdsList()); + Assert.assertEquals(Arrays.asList(101L, 102L), syncRequests.get(1).getTabletIdsList()); + Assert.assertEquals(Arrays.asList(103L), syncRequests.get(2).getTabletIdsList()); + Assert.assertEquals(Arrays.asList(103L), syncRequests.get(3).getTabletIdsList()); + } + + @Test + public void testNotifyBackendsToSyncTabletMetaSwallowsDispatchFailure() throws Exception { + CloudSchemaChangeHandler handler = new CloudSchemaChangeHandler(); + Backend aliveBackend1 = new Backend(1L, "be-1", 9050); + aliveBackend1.setAlive(true); + aliveBackend1.setBrpcPort(8060); + Backend aliveBackend2 = new Backend(2L, "be-2", 9050); + aliveBackend2.setAlive(true); + aliveBackend2.setBrpcPort(8061); + + SystemInfoService systemInfoService = Mockito.mock(SystemInfoService.class); + Mockito.when(systemInfoService.getAllBackendsByAllCluster()).thenReturn(ImmutableMap.of( + aliveBackend1.getId(), aliveBackend1, + aliveBackend2.getId(), aliveBackend2)); + + BackendServiceProxy backendServiceProxy = Mockito.mock(BackendServiceProxy.class); + Mockito.when(backendServiceProxy.syncTabletMeta(Mockito.argThat( + addr -> addr != null && "be-1".equals(addr.getHostname())), Mockito.any())) + .thenReturn(Futures.immediateFuture(okSyncTabletMetaResponse())); + Mockito.when(backendServiceProxy.syncTabletMeta(Mockito.argThat( + addr -> addr != null && "be-2".equals(addr.getHostname())), Mockito.any())) + .thenThrow(new org.apache.doris.rpc.RpcException("be-2", "dispatch failed")); + + try (MockedStatic envMock = Mockito.mockStatic(Env.class); + MockedStatic backendProxyMock = Mockito.mockStatic(BackendServiceProxy.class)) { + envMock.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService); + backendProxyMock.when(BackendServiceProxy::getInstance).thenReturn(backendServiceProxy); + + handler.notifyBackendsToSyncTabletMeta(Arrays.asList(101L, 102L)); + } + + Mockito.verify(backendServiceProxy, Mockito.times(2)) + .syncTabletMeta(Mockito.any(), Mockito.any()); + } + + @Test + public void testNotifyBackendsToSyncTabletMetaSwallowsNonOkResponse() throws Exception { + CloudSchemaChangeHandler handler = new CloudSchemaChangeHandler(); + Backend aliveBackend1 = new Backend(1L, "be-1", 9050); + aliveBackend1.setAlive(true); + aliveBackend1.setBrpcPort(8060); + Backend aliveBackend2 = new Backend(2L, "be-2", 9050); + aliveBackend2.setAlive(true); + aliveBackend2.setBrpcPort(8061); + + SystemInfoService systemInfoService = Mockito.mock(SystemInfoService.class); + Mockito.when(systemInfoService.getAllBackendsByAllCluster()).thenReturn(ImmutableMap.of( + aliveBackend1.getId(), aliveBackend1, + aliveBackend2.getId(), aliveBackend2)); + + BackendServiceProxy backendServiceProxy = Mockito.mock(BackendServiceProxy.class); + Mockito.when(backendServiceProxy.syncTabletMeta(Mockito.any(), Mockito.any())) + .thenReturn(Futures.immediateFuture(InternalService.PSyncTabletMetaResponse.newBuilder() + .setStatus(org.apache.doris.proto.Types.PStatus.newBuilder() + .setStatusCode(TStatusCode.CANCELLED.getValue())) + .setFailedTablets(2) + .build())); + + try (MockedStatic envMock = Mockito.mockStatic(Env.class); + MockedStatic backendProxyMock = Mockito.mockStatic(BackendServiceProxy.class)) { + envMock.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService); + backendProxyMock.when(BackendServiceProxy::getInstance).thenReturn(backendServiceProxy); + + handler.notifyBackendsToSyncTabletMeta(Arrays.asList(101L, 102L)); + } + + Mockito.verify(backendServiceProxy, Mockito.times(2)) + .syncTabletMeta(Mockito.any(), Mockito.any()); + } + + @Test + public void testNotifyBackendsToSyncTabletMetaReturnsWhenBackendDiscoveryFails() throws Exception { + CloudSchemaChangeHandler handler = new CloudSchemaChangeHandler(); + SystemInfoService systemInfoService = Mockito.mock(SystemInfoService.class); + Mockito.when(systemInfoService.getAllBackendsByAllCluster()) + .thenThrow(new org.apache.doris.common.AnalysisException("backend discovery failed")); + + BackendServiceProxy backendServiceProxy = Mockito.mock(BackendServiceProxy.class); + + try (MockedStatic envMock = Mockito.mockStatic(Env.class); + MockedStatic backendProxyMock = Mockito.mockStatic(BackendServiceProxy.class)) { + envMock.when(Env::getCurrentSystemInfo).thenReturn(systemInfoService); + backendProxyMock.when(BackendServiceProxy::getInstance).thenReturn(backendServiceProxy); + + handler.notifyBackendsToSyncTabletMeta(Arrays.asList(101L, 102L)); + } + + Mockito.verifyNoInteractions(backendServiceProxy); + } + + private Cloud.UpdateTabletResponse okUpdateTabletResponse() { + return Cloud.UpdateTabletResponse.newBuilder() + .setStatus(Cloud.MetaServiceResponseStatus.newBuilder().setCode(Cloud.MetaServiceCode.OK)) + .build(); + } + + private InternalService.PSyncTabletMetaResponse okSyncTabletMetaResponse() { + return InternalService.PSyncTabletMetaResponse.newBuilder() + .setStatus(org.apache.doris.proto.Types.PStatus.newBuilder() + .setStatusCode(TStatusCode.OK.getValue())) + .setSyncedTablets(1) + .build(); + } +} diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 808a70e327a9ba..66d2eb1b5aae04 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -1071,6 +1071,17 @@ message PAlterVaultSyncRequest { message PAlterVaultSyncResponse { } +message PSyncTabletMetaRequest { + repeated int64 tablet_ids = 1; +} + +message PSyncTabletMetaResponse { + optional PStatus status = 1; + optional int64 synced_tablets = 2; + optional int64 skipped_tablets = 3; + optional int64 failed_tablets = 4; +} + message PGetBeResourceRequest { } @@ -1223,6 +1234,7 @@ service PBackendService { rpc fetch_remote_tablet_schema(PFetchRemoteSchemaRequest) returns (PFetchRemoteSchemaResponse); rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns (PJdbcTestConnectionResult); rpc alter_vault_sync(PAlterVaultSyncRequest) returns (PAlterVaultSyncResponse); + rpc sync_tablet_meta(PSyncTabletMetaRequest) returns (PSyncTabletMetaResponse); rpc get_be_resource(PGetBeResourceRequest) returns (PGetBeResourceResponse); rpc delete_dictionary(PDeleteDictionaryRequest) returns (PDeleteDictionaryResponse); rpc commit_refresh_dictionary(PCommitRefreshDictionaryRequest) returns (PCommitRefreshDictionaryResponse); @@ -1231,4 +1243,3 @@ service PBackendService { rpc fetch_peer_data(PFetchPeerDataRequest) returns (PFetchPeerDataResponse); rpc request_cdc_client(PRequestCdcClientRequest) returns (PRequestCdcClientResult); }; - diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_sync_tablet_meta_after_alter.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_sync_tablet_meta_after_alter.groovy new file mode 100644 index 00000000000000..17bdeae2d12655 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_sync_tablet_meta_after_alter.groovy @@ -0,0 +1,284 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_cloud_sync_tablet_meta_after_alter", "nonConcurrent") { + if (!isCloudMode()) { + return + } + if (!getFeConfig("enable_debug_points").equalsIgnoreCase("true")) { + logger.info("enable_debug_points=false, skip") + return + } + if (context.config.multiClusterBes == null || context.config.multiClusterBes.trim().isEmpty()) { + logger.info("multiClusterBes is empty, skip") + return + } + + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + def cluster0 = "sync_meta_cluster0" + def cluster1 = "sync_meta_cluster1" + def clusterId0 = "sync_meta_cluster_id0" + def clusterId1 = "sync_meta_cluster_id1" + def dbName = "reg_sync_tablet_meta_db" + def tableName = "test_cloud_sync_tablet_meta_after_alter" + def versionLimitTableName = "test_cloud_sync_tablet_meta_version_limit_after_alter" + + List ipList = [] + List hbPortList = [] + List beUniqueIdList = [] + + String[] bes = context.config.multiClusterBes.split(',') + for (String values : bes) { + if (ipList.size() == 2) { + break + } + String[] beInfo = values.split(':') + ipList.add(beInfo[0]) + hbPortList.add(beInfo[1]) + beUniqueIdList.add(beInfo[3]) + } + assertTrue(ipList.size() >= 2, "need at least two backends in multiClusterBes") + + def getBackendForCluster = { String clusterName -> + def backend = sql_return_maparray("show backends").find { it.Tag.contains(clusterName) } + assert backend != null: "backend for cluster ${clusterName} not found" + return backend + } + + def getBvar = { backend, String name -> + def url = "http://${backend.Host}:${backend.BrpcPort}/vars/${name}" + def (code, out, err) = curl("GET", url) + assertEquals(0, code) + def matcher = (out =~ /(?m)^${java.util.regex.Pattern.quote(name)}\\s*:\\s*(\\d+)$/) + assertTrue(matcher.find(), "failed to parse bvar ${name} from ${url}, out=${out}, err=${err}") + return matcher.group(1).toLong() + } + + def waitForBvarIncrease = { backend, String name, long baseline, long delta -> + for (int retry = 0; retry < 20; retry++) { + def current = getBvar(backend, name) + logger.info("wait bvar ${name} on ${backend.Host}:${backend.BrpcPort}, baseline=${baseline}, current=${current}") + if (current >= baseline + delta) { + return current + } + sleep(500) + } + return getBvar(backend, name) + } + + def getCompactionPolicy = { String currentTableName -> + def tablets = sql_return_maparray("""show tablets from ${currentTableName};""") + assertEquals(1, tablets.size()) + def (code, out, err) = curl("GET", tablets[0].CompactionStatus) + assertEquals(0, code) + def tabletJson = parseJson(out.trim()) + return tabletJson["compaction policy"] + } + + def useCluster = { String clusterName -> + sql """use @${clusterName}""" + sql """use ${dbName}""" + } + + def cleanupClusters = { + for (uniqueId in beUniqueIdList) { + def resp = get_cluster.call(uniqueId) + for (cluster in resp) { + if (cluster.type == "COMPUTE") { + drop_cluster.call(cluster.cluster_name, cluster.cluster_id) + } + } + } + wait_cluster_change() + } + + setBeConfigTemporary([ + tablet_sync_interval_s: 18000, + schedule_sync_tablets_interval_s: 18000 + ]) { + try { + cleanupClusters.call() + add_cluster.call(beUniqueIdList[0], ipList[0], hbPortList[0], cluster0, clusterId0) + add_cluster.call(beUniqueIdList[1], ipList[1], hbPortList[1], cluster1, clusterId1) + wait_cluster_change() + + sql """use @${cluster0}""" + sql """create database if not exists ${dbName}""" + sql """use ${dbName}""" + sql """drop table if exists ${tableName} force""" + sql """drop table if exists ${versionLimitTableName} force""" + sql """ + CREATE TABLE ${tableName} ( + id INT NOT NULL, + value INT + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ) + """ + sql """insert into ${tableName} values (1, 10), (2, 20), (3, 30)""" + + def be0 = getBackendForCluster(cluster0) + def be1 = getBackendForCluster(cluster1) + + // Case 1: warm only cluster0, alter property, and verify cluster1 is notified but skipped. + useCluster.call(cluster0) + qt_case1_cluster0_warmup """select count(*) from ${tableName}""" + assertEquals("size_based", getCompactionPolicy(tableName)) + + def case1Synced0 = getBvar(be0, "cloud_sync_tablet_meta_synced_total") + def case1Skipped1 = getBvar(be1, "cloud_sync_tablet_meta_skipped_total") + + sql """alter table ${tableName} set ("compaction_policy" = "time_series")""" + + assertTrue(waitForBvarIncrease(be0, "cloud_sync_tablet_meta_synced_total", case1Synced0, 1) >= case1Synced0 + 1) + assertTrue(waitForBvarIncrease(be1, "cloud_sync_tablet_meta_skipped_total", case1Skipped1, 1) >= case1Skipped1 + 1) + + useCluster.call(cluster0) + qt_case1_cluster0_after_alter """select count(*) from ${tableName}""" + assertEquals("time_series", getCompactionPolicy(tableName)) + + useCluster.call(cluster1) + qt_case1_cluster1_first_access """select count(*) from ${tableName}""" + assertEquals("time_series", getCompactionPolicy(tableName)) + + // Case 2: warm both clusters, alter property again, and verify both clusters sync immediately. + useCluster.call(cluster0) + qt_case2_cluster0_warmup """select count(*) from ${tableName}""" + useCluster.call(cluster1) + qt_case2_cluster1_warmup """select count(*) from ${tableName}""" + + def case2Synced0 = getBvar(be0, "cloud_sync_tablet_meta_synced_total") + def case2Synced1 = getBvar(be1, "cloud_sync_tablet_meta_synced_total") + + useCluster.call(cluster0) + sql """alter table ${tableName} set ("compaction_policy" = "size_based")""" + + assertTrue(waitForBvarIncrease(be0, "cloud_sync_tablet_meta_synced_total", case2Synced0, 1) >= case2Synced0 + 1) + assertTrue(waitForBvarIncrease(be1, "cloud_sync_tablet_meta_synced_total", case2Synced1, 1) >= case2Synced1 + 1) + + useCluster.call(cluster0) + assertEquals("size_based", getCompactionPolicy(tableName)) + useCluster.call(cluster1) + assertEquals("size_based", getCompactionPolicy(tableName)) + + // Case 3: hit the size_based version limit first, then alter to time_series and verify writes succeed immediately. + useCluster.call(cluster0) + sql """drop table if exists ${versionLimitTableName} force""" + sql """ + CREATE TABLE ${versionLimitTableName} ( + id BIGINT NOT NULL, + value BIGINT + ) ENGINE=OLAP + DUPLICATE KEY(id) + DISTRIBUTED BY HASH(id) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ) + """ + qt_case3_version_limit_warmup """select count(*) from ${versionLimitTableName}""" + assertEquals("size_based", getCompactionPolicy(versionLimitTableName)) + + int successfulBeforeAlter = 0 + String versionLimitFailure = null + for (int i = 0; i < 2400; i++) { + try { + sql """insert into ${versionLimitTableName} values (${i}, ${i})""" + successfulBeforeAlter++ + if ((i + 1) % 200 == 0) { + logger.info("inserted {} versions into {}", i + 1, versionLimitTableName) + } + } catch (Exception e) { + versionLimitFailure = e.getMessage() + logger.info("hit version limit after {} successful inserts, msg={}", + successfulBeforeAlter, versionLimitFailure) + break + } + } + assertTrue(versionLimitFailure != null, "expected version limit failure before alter") + assertTrue(versionLimitFailure.contains("-235") + || versionLimitFailure.toLowerCase().contains("too many versions") + || versionLimitFailure.toLowerCase().contains("version count") + || versionLimitFailure.toLowerCase().contains("exceed limit"), + "unexpected failure message: ${versionLimitFailure}") + + def versionLimitTablets = sql_return_maparray("""show tablets from ${versionLimitTableName};""") + assertEquals(1, versionLimitTablets.size()) + long versionCountBeforeAlter = versionLimitTablets[0].VersionCount.toLong() + assertTrue(versionCountBeforeAlter >= 2000, + "expected version count near max limit, actual=${versionCountBeforeAlter}") + + sql """alter table ${versionLimitTableName} set ("compaction_policy" = "time_series")""" + assertEquals("time_series", getCompactionPolicy(versionLimitTableName)) + + int successfulAfterAlter = 0 + for (int i = 0; i < 50; i++) { + long rowId = 100000L + i + sql """insert into ${versionLimitTableName} values (${rowId}, ${rowId})""" + successfulAfterAlter++ + } + assertEquals(50, successfulAfterAlter) + def versionLimitTabletsAfterAlter = sql_return_maparray("""show tablets from ${versionLimitTableName};""") + assertEquals(1, versionLimitTabletsAfterAlter.size()) + assertTrue(versionLimitTabletsAfterAlter[0].VersionCount.toLong() > versionCountBeforeAlter, + "expected version count to keep increasing immediately after alter") + qt_case3_version_limit_after_alter """select count(*) from ${versionLimitTableName}""" + + // Case 4: disable FE proactive notification and verify cached tablets remain stale. + GetDebugPoint().enableDebugPointForAllFEs("CloudSchemaChangeHandler.notifyBackendsToSyncTabletMeta.skip") + def case4Request0 = getBvar(be0, "cloud_sync_tablet_meta_requests_total") + def case4Request1 = getBvar(be1, "cloud_sync_tablet_meta_requests_total") + def case4Synced0 = getBvar(be0, "cloud_sync_tablet_meta_synced_total") + def case4Synced1 = getBvar(be1, "cloud_sync_tablet_meta_synced_total") + def case4Skipped0 = getBvar(be0, "cloud_sync_tablet_meta_skipped_total") + def case4Skipped1 = getBvar(be1, "cloud_sync_tablet_meta_skipped_total") + + useCluster.call(cluster0) + sql """alter table ${tableName} set ("compaction_policy" = "time_series")""" + sleep(2000) + + assertEquals(case4Request0, getBvar(be0, "cloud_sync_tablet_meta_requests_total")) + assertEquals(case4Request1, getBvar(be1, "cloud_sync_tablet_meta_requests_total")) + assertEquals(case4Synced0, getBvar(be0, "cloud_sync_tablet_meta_synced_total")) + assertEquals(case4Synced1, getBvar(be1, "cloud_sync_tablet_meta_synced_total")) + assertEquals(case4Skipped0, getBvar(be0, "cloud_sync_tablet_meta_skipped_total")) + assertEquals(case4Skipped1, getBvar(be1, "cloud_sync_tablet_meta_skipped_total")) + + useCluster.call(cluster0) + assertEquals("size_based", getCompactionPolicy(tableName)) + useCluster.call(cluster1) + assertEquals("size_based", getCompactionPolicy(tableName)) + } finally { + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + try { + sql """use @${cluster0}""" + sql """drop table if exists ${dbName}.${tableName} force""" + sql """drop table if exists ${dbName}.${versionLimitTableName} force""" + } catch (Exception e) { + logger.info("drop table in finally failed: ${e.getMessage()}") + } + cleanupClusters.call() + } + } +}