Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ private boolean balanceAllPartitionsByPhase(ActiveSchedulePhase phase) {
// So infos can be empty even when balance work was done. Use indexBalanced (set to false by
// updateBalanceStatus inside balanceImpl when warmup moves succeed) to reflect the real state.
if (infos.isEmpty()) {
resetCloudBalanceMetric(StatType.PARTITION);
LOG.info("partition balance({}) done, infos empty (warmup or already balanced), indexBalanced={}",
phase, indexBalanced);
return indexBalanced;
Expand Down Expand Up @@ -733,6 +734,7 @@ private boolean balanceAllTablesByPhase(ActiveSchedulePhase phase) {
// Same as balanceAllPartitionsByPhase: in warmup mode infos stays empty even when
// warmup tasks were scheduled. Use tableBalanced to reflect the real state.
if (infos.isEmpty()) {
resetCloudBalanceMetric(StatType.TABLE);
LOG.info("table balance({}) done, infos empty (warmup or already balanced), tableBalanced={}",
phase, tableBalanced);
return tableBalanced;
Expand Down Expand Up @@ -777,6 +779,7 @@ public void globalBalance() {
balanceImpl(entry.getValue(), entry.getKey(), futureBeToTabletsGlobal, BalanceType.GLOBAL, infos);
}
if (infos.isEmpty()) {
resetCloudBalanceMetric(StatType.GLOBAL);
return;
}
long oldSize = infos.size();
Expand Down Expand Up @@ -2213,8 +2216,7 @@ private List<UpdateCloudReplicaInfo> batchUpdateCloudReplicaInfoEditlogs(List<Up
String clusterId = entry.getKey();
notBalancedClusterIds.remove(clusterId);
List<UpdateCloudReplicaInfo> infoList = entry.getValue();
String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getClusterNameByClusterId(clusterId);
String clusterName = getClusterNameByClusterId(clusterId);
if (!Strings.isNullOrEmpty(clusterName)) {
MetricRepo.updateClusterCloudBalanceNum(clusterName, clusterId, type, infoList.size());
}
Expand Down Expand Up @@ -2256,8 +2258,7 @@ private List<UpdateCloudReplicaInfo> batchUpdateCloudReplicaInfoEditlogs(List<Up
}

for (String clusterId : notBalancedClusterIds) {
String clusterName = ((CloudSystemInfoService) Env.getCurrentSystemInfo())
.getClusterNameByClusterId(clusterId);
String clusterName = getClusterNameByClusterId(clusterId);
if (!Strings.isNullOrEmpty(clusterName)) {
MetricRepo.updateClusterCloudBalanceNum(clusterName, clusterId, type, 0);
}
Expand All @@ -2270,6 +2271,28 @@ private List<UpdateCloudReplicaInfo> batchUpdateCloudReplicaInfoEditlogs(List<Up
return rets;
}

private String getClusterNameByClusterId(String clusterId) {
CloudSystemInfoService systemInfoService = cloudSystemInfoService != null
? cloudSystemInfoService
: (CloudSystemInfoService) Env.getCurrentSystemInfo();
if (systemInfoService == null) {
return null;
}
return systemInfoService.getClusterNameByClusterId(clusterId);
}

private void resetCloudBalanceMetric(StatType type) {
if (clusterToBes == null || clusterToBes.isEmpty()) {
return;
}
for (String clusterId : clusterToBes.keySet()) {
String clusterName = getClusterNameByClusterId(clusterId);
if (!Strings.isNullOrEmpty(clusterName)) {
MetricRepo.updateClusterCloudBalanceNum(clusterName, clusterId, type, 0);
}
}
}

public boolean isInited() {
return inited;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package org.apache.doris.cloud.catalog;

import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.Config;
import org.apache.doris.metric.MetricRepo;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -298,4 +302,30 @@ public void testMigrateTabletsForSmoothUpgrade_emptyQueueReturnsFalse() throws E
boolean migrated = invokePrivate(r, "migrateTabletsForSmoothUpgrade", new Class<?>[] {}, new Object[] {});
Assertions.assertFalse(migrated);
}

@Test
public void testResetCloudBalanceMetric_clearsMetricForAllClusters() throws Exception {
CloudSystemInfoService systemInfoService = Mockito.mock(CloudSystemInfoService.class);
TestRebalancer r = new TestRebalancer();
setField(r, "cloudSystemInfoService", systemInfoService);

Map<String, List<Long>> clusterToBes = new HashMap<>();
clusterToBes.put("cluster-a", Collections.singletonList(1L));
clusterToBes.put("cluster-b", Collections.singletonList(2L));
setField(r, "clusterToBes", clusterToBes);

Mockito.when(systemInfoService.getClusterNameByClusterId("cluster-a")).thenReturn("compute_cluster_a");
Mockito.when(systemInfoService.getClusterNameByClusterId("cluster-b")).thenReturn("compute_cluster_b");

try (MockedStatic<MetricRepo> metricRepo = Mockito.mockStatic(MetricRepo.class)) {
invokePrivate(r, "resetCloudBalanceMetric",
new Class<?>[] {CloudTabletRebalancer.StatType.class},
new Object[] {CloudTabletRebalancer.StatType.PARTITION});

metricRepo.verify(() -> MetricRepo.updateClusterCloudBalanceNum(
"compute_cluster_a", "cluster-a", CloudTabletRebalancer.StatType.PARTITION, 0L));
metricRepo.verify(() -> MetricRepo.updateClusterCloudBalanceNum(
"compute_cluster_b", "cluster-b", CloudTabletRebalancer.StatType.PARTITION, 0L));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ suite('test_balance_use_compute_group_properties', 'docker') {
logger.info("after add be balance every cluster cache {}", afterBalanceEveryClusterCache)

// assert first map keys
def assertFirstMapKeys = { clusterRet, expectedEqual ->
def assertFirstMapKeys = { clusterName, clusterRet, expectedEqual ->
def firstMap = clusterRet[0]
def keys = firstMap.keySet().toList()
logger.info("debug: clusterName {} keys {}", clusterName, keys)
Expand All @@ -190,22 +190,22 @@ suite('test_balance_use_compute_group_properties', 'docker') {
def global_config_cluster_ret = afterBalanceEveryClusterCache[global_config_cluster]
logger.info("global_config_cluster_ret {}", global_config_cluster_ret)
// fe tablets not changed
assertFirstMapKeys(global_config_cluster_ret, true)
assertFirstMapKeys(global_config_cluster, global_config_cluster_ret, true)

def without_warmup_cluster_ret = afterBalanceEveryClusterCache[without_warmup_cluster]
logger.info("without_warmup_cluster_ret {}", without_warmup_cluster_ret)
// fe tablets has changed
assertFirstMapKeys(without_warmup_cluster_ret, false)
assertFirstMapKeys(without_warmup_cluster, without_warmup_cluster_ret, false)

def async_warmup_cluster_ret = afterBalanceEveryClusterCache[async_warmup_cluster]
logger.info("async_warmup_cluster_ret {}", async_warmup_cluster_ret)
// fe tablets has changed, due to task timeout
assertFirstMapKeys(async_warmup_cluster_ret, false)
assertFirstMapKeys(async_warmup_cluster, async_warmup_cluster_ret, false)

def sync_warmup_cluster_ret = afterBalanceEveryClusterCache[sync_warmup_cluster]
logger.info("sync_warmup_cluster_ret {}", sync_warmup_cluster_ret)
// fe tablets not changed
assertFirstMapKeys(sync_warmup_cluster_ret, true)
assertFirstMapKeys(sync_warmup_cluster, sync_warmup_cluster_ret, true)

logger.info("success check after balance every cluster cache, cluster's balance type is worked")
}
Expand Down
Loading