Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,13 @@ public class AmoroManagementConf {
.withDescription(
"Interval for bucket assignment service to detect node changes and redistribute bucket IDs.");

public static final ConfigOption<Duration> HA_BUCKET_TABLE_SYNC_INTERVAL =
ConfigOptions.key("ha.bucket-table-sync.interval")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription(
"Interval for syncing tables assigned to bucket IDs in master-slave mode. Each node periodically loads tables from database based on its assigned bucket IDs.");

public static final ConfigOption<Integer> TABLE_SERVICE_THRIFT_BIND_PORT =
ConfigOptions.key("thrift-server.table-service.bind-port")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,17 @@ public void startOptimizingService() throws Exception {

DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory();
defaultRuntimeFactory.initialize(processFactories);
// In master-slave mode, create AmsAssignService for bucket assignment

BucketAssignStore bucketAssignStore = null;
if (IS_MASTER_SLAVE_MODE && haContainer != null) {
bucketAssignStore = BucketAssignStoreFactory.create(haContainer, serviceConfig);
}

// In master-slave mode, create AmsAssignService for bucket assignment (shares BucketAssignStore
// with DefaultTableService).
if (IS_MASTER_SLAVE_MODE && haContainer != null && bucketAssignStore != null) {
try {
// Create and start AmsAssignService for bucket assignment
// The factory will handle different HA types (ZK, database, etc.)
amsAssignService = new AmsAssignService(haContainer, serviceConfig);
amsAssignService = new AmsAssignService(haContainer, serviceConfig, bucketAssignStore);
amsAssignService.start();
LOG.info("AmsAssignService started for master-slave mode");
} catch (UnsupportedOperationException e) {
Expand All @@ -267,7 +272,9 @@ public void startOptimizingService() throws Exception {

List<ActionCoordinator> actionCoordinators = defaultRuntimeFactory.supportedCoordinators();

tableService = new DefaultTableService(serviceConfig, catalogManager, defaultRuntimeFactory);
tableService =
new DefaultTableService(
serviceConfig, catalogManager, defaultRuntimeFactory, haContainer, bucketAssignStore);
processService = new ProcessService(tableService, actionCoordinators, executeEngineManager);
optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ boolean isRunning() {
}

public AmsAssignService(HighAvailabilityContainer haContainer, Configurations serviceConfig) {
this(haContainer, serviceConfig, null);
}

/**
* @param assignStore if non-null, used as the bucket assignment store; otherwise one is created
* via {@link BucketAssignStoreFactory} (same instance can be shared with {@code
* DefaultTableService}).
*/
public AmsAssignService(
HighAvailabilityContainer haContainer,
Configurations serviceConfig,
BucketAssignStore assignStore) {
this.haContainer = haContainer;
this.serviceConfig = serviceConfig;
this.bucketIdTotalCount =
Expand All @@ -75,7 +87,10 @@ public AmsAssignService(HighAvailabilityContainer haContainer, Configurations se
serviceConfig.get(AmoroManagementConf.HA_NODE_OFFLINE_TIMEOUT).toMillis();
this.assignIntervalSeconds =
serviceConfig.get(AmoroManagementConf.HA_ASSIGN_INTERVAL).getSeconds();
this.assignStore = BucketAssignStoreFactory.create(haContainer, serviceConfig);
this.assignStore =
assignStore != null
? assignStore
: BucketAssignStoreFactory.create(haContainer, serviceConfig);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,11 @@ public boolean hasLeadership() {
return isLeader.get();
}

@Override
public AmsServerInfo getTableServiceServerInfo() {
return tableServiceServerInfo;
}

/** Closes the heartbeat executor safely. */
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,11 @@ public interface HighAvailabilityContainer {
* @return true if the current AMS node is the primary node, false otherwise
*/
boolean hasLeadership();

/**
* Get current AMS node information.
*
* @return {@link AmsServerInfo}
*/
AmsServerInfo getTableServiceServerInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,9 @@ public List<AmsServerInfo> getAliveNodes() {
public boolean hasLeadership() {
return false;
}

@Override
public AmsServerInfo getTableServiceServerInfo() {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.persistence;

/** Simple class to hold bucketId and its table count. */
public class BucketIdCount {
private String bucketId;
private Integer count;

public String getBucketId() {
return bucketId;
}

public void setBucketId(String bucketId) {
this.bucketId = bucketId;
}

public Integer getCount() {
return count;
}

public void setCount(Integer count) {
this.count = count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.amoro.server.persistence.mapper;

import org.apache.amoro.server.persistence.BucketIdCount;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.persistence.TableRuntimeState;
import org.apache.ibatis.annotations.Delete;
Expand Down Expand Up @@ -59,6 +60,17 @@ public interface TableRuntimeMapper {
+ " WHERE table_id = #{tableId}")
int updateRuntime(TableRuntimeMeta meta);

/**
* Sets bucket_id only when it is still null. Avoids overwriting status/config/summary with stale
* snapshots from a prior read.
*/
@Update(
"UPDATE "
+ TABLE_NAME
+ " SET bucket_id = #{bucketId, jdbcType=VARCHAR} "
+ "WHERE table_id = #{tableId} AND bucket_id IS NULL")
int updateBucketIdIfNull(@Param("tableId") Long tableId, @Param("bucketId") String bucketId);

/* ---------- delete ---------- */
@Delete("DELETE FROM " + TABLE_NAME + " WHERE table_id = #{tableId}")
int deleteRuntime(@Param("tableId") Long tableId);
Expand Down Expand Up @@ -102,6 +114,29 @@ public interface TableRuntimeMapper {
@ResultMap("tableRuntimeMeta")
List<TableRuntimeMeta> selectAllRuntimes();

@Select(
"<script>"
+ "SELECT "
+ SELECT_COLS
+ "FROM "
+ TABLE_NAME
+ " WHERE bucket_id IN "
+ "<foreach item='item' collection='bucketIds' open='(' separator=',' close=')'>"
+ "#{item}"
+ "</foreach>"
+ "<if test='includeNullBucketId'> OR bucket_id IS NULL </if>"
+ "</script>")
/**
* Select runtimes by bucket ids.
*
* @param includeNullBucketId false = only rows with bucket_id in list (master-slave); true = also
* include bucket_id IS NULL (e.g. for non-master-slave compatibility)
*/
@ResultMap("tableRuntimeMeta")
List<TableRuntimeMeta> selectRuntimesByBucketIds(
@Param("bucketIds") List<String> bucketIds,
@Param("includeNullBucketId") boolean includeNullBucketId);

@Select(
"<script>"
+ "<bind name=\"isMySQL\" value=\"_databaseId == 'mysql'\" />"
Expand Down Expand Up @@ -180,4 +215,19 @@ void saveState(

@Delete("DELETE FROM " + STATE_TABLE_NAME + " WHERE table_id = #{tableId}")
void removeAllTableStates(@Param("tableId") long tableId);

/**
* Count tables per bucketId. Returns a map where key is bucketId and value is the count of tables
* for that bucketId. Only counts non-null and non-empty bucketIds.
*/
@Select(
"SELECT bucket_id, COUNT(*) as table_count FROM "
+ TABLE_NAME
+ " WHERE bucket_id IS NOT NULL AND bucket_id != '' "
+ "GROUP BY bucket_id")
@Results({
@Result(column = "bucket_id", property = "bucketId"),
@Result(column = "table_count", property = "count")
})
List<BucketIdCount> countTablesByBucketId();
}
Loading
Loading