diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java index 50cb79fb616f77..d95b90f899306c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudTabletRebalancer.java @@ -684,6 +684,7 @@ private boolean balanceAllPartitionsByPhase(ActiveSchedulePhase phase) { // So infos can be empty even when balance work was done. Use indexBalanced (set to false by // updateBalanceStatus inside balanceImpl when warmup moves succeed) to reflect the real state. if (infos.isEmpty()) { + resetCloudBalanceMetric(StatType.PARTITION); LOG.info("partition balance({}) done, infos empty (warmup or already balanced), indexBalanced={}", phase, indexBalanced); return indexBalanced; @@ -733,6 +734,7 @@ private boolean balanceAllTablesByPhase(ActiveSchedulePhase phase) { // Same as balanceAllPartitionsByPhase: in warmup mode infos stays empty even when // warmup tasks were scheduled. Use tableBalanced to reflect the real state. if (infos.isEmpty()) { + resetCloudBalanceMetric(StatType.TABLE); LOG.info("table balance({}) done, infos empty (warmup or already balanced), tableBalanced={}", phase, tableBalanced); return tableBalanced; @@ -777,6 +779,7 @@ public void globalBalance() { balanceImpl(entry.getValue(), entry.getKey(), futureBeToTabletsGlobal, BalanceType.GLOBAL, infos); } if (infos.isEmpty()) { + resetCloudBalanceMetric(StatType.GLOBAL); return; } long oldSize = infos.size(); @@ -2213,8 +2216,7 @@ private List batchUpdateCloudReplicaInfoEditlogs(List infoList = entry.getValue(); - String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo()) - .getClusterNameByClusterId(clusterId); + String clusterName = getClusterNameByClusterId(clusterId); if (!Strings.isNullOrEmpty(clusterName)) { MetricRepo.updateClusterCloudBalanceNum(clusterName, clusterId, type, infoList.size()); } @@ -2256,8 +2258,7 @@ private List batchUpdateCloudReplicaInfoEditlogs(List batchUpdateCloudReplicaInfoEditlogs(List[] {}, new Object[] {}); Assertions.assertFalse(migrated); } + + @Test + public void testResetCloudBalanceMetric_clearsMetricForAllClusters() throws Exception { + CloudSystemInfoService systemInfoService = Mockito.mock(CloudSystemInfoService.class); + TestRebalancer r = new TestRebalancer(); + setField(r, "cloudSystemInfoService", systemInfoService); + + Map> clusterToBes = new HashMap<>(); + clusterToBes.put("cluster-a", Collections.singletonList(1L)); + clusterToBes.put("cluster-b", Collections.singletonList(2L)); + setField(r, "clusterToBes", clusterToBes); + + Mockito.when(systemInfoService.getClusterNameByClusterId("cluster-a")).thenReturn("compute_cluster_a"); + Mockito.when(systemInfoService.getClusterNameByClusterId("cluster-b")).thenReturn("compute_cluster_b"); + + try (MockedStatic metricRepo = Mockito.mockStatic(MetricRepo.class)) { + invokePrivate(r, "resetCloudBalanceMetric", + new Class[] {CloudTabletRebalancer.StatType.class}, + new Object[] {CloudTabletRebalancer.StatType.PARTITION}); + + metricRepo.verify(() -> MetricRepo.updateClusterCloudBalanceNum( + "compute_cluster_a", "cluster-a", CloudTabletRebalancer.StatType.PARTITION, 0L)); + metricRepo.verify(() -> MetricRepo.updateClusterCloudBalanceNum( + "compute_cluster_b", "cluster-b", CloudTabletRebalancer.StatType.PARTITION, 0L)); + } + } } diff --git a/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy b/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy index 5a229088d2803d..531f8aeb8ad08a 100644 --- a/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy +++ b/regression-test/suites/cloud_p0/balance/test_balance_use_compute_group_properties.groovy @@ -174,7 +174,7 @@ suite('test_balance_use_compute_group_properties', 'docker') { logger.info("after add be balance every cluster cache {}", afterBalanceEveryClusterCache) // assert first map keys - def assertFirstMapKeys = { clusterRet, expectedEqual -> + def assertFirstMapKeys = { clusterName, clusterRet, expectedEqual -> def firstMap = clusterRet[0] def keys = firstMap.keySet().toList() logger.info("debug: clusterName {} keys {}", clusterName, keys) @@ -190,22 +190,22 @@ suite('test_balance_use_compute_group_properties', 'docker') { def global_config_cluster_ret = afterBalanceEveryClusterCache[global_config_cluster] logger.info("global_config_cluster_ret {}", global_config_cluster_ret) // fe tablets not changed - assertFirstMapKeys(global_config_cluster_ret, true) + assertFirstMapKeys(global_config_cluster, global_config_cluster_ret, true) def without_warmup_cluster_ret = afterBalanceEveryClusterCache[without_warmup_cluster] logger.info("without_warmup_cluster_ret {}", without_warmup_cluster_ret) // fe tablets has changed - assertFirstMapKeys(without_warmup_cluster_ret, false) + assertFirstMapKeys(without_warmup_cluster, without_warmup_cluster_ret, false) def async_warmup_cluster_ret = afterBalanceEveryClusterCache[async_warmup_cluster] logger.info("async_warmup_cluster_ret {}", async_warmup_cluster_ret) // fe tablets has changed, due to task timeout - assertFirstMapKeys(async_warmup_cluster_ret, false) + assertFirstMapKeys(async_warmup_cluster, async_warmup_cluster_ret, false) def sync_warmup_cluster_ret = afterBalanceEveryClusterCache[sync_warmup_cluster] logger.info("sync_warmup_cluster_ret {}", sync_warmup_cluster_ret) // fe tablets not changed - assertFirstMapKeys(sync_warmup_cluster_ret, true) + assertFirstMapKeys(sync_warmup_cluster, sync_warmup_cluster_ret, true) logger.info("success check after balance every cluster cache, cluster's balance type is worked") }