Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.fluss.rpc.messages.RebalanceResponse;
import org.apache.fluss.rpc.messages.RemoveServerTagResponse;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.Errors;
import org.apache.fluss.server.coordinator.event.AccessContextEvent;
import org.apache.fluss.server.coordinator.event.AddServerTagEvent;
import org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
Expand Down Expand Up @@ -342,8 +343,14 @@ private void initCoordinatorContext() throws Exception {
long start4loadTabletServer = System.currentTimeMillis();
Map<Integer, TabletServerRegistration> tabletServerRegistrations =
zooKeeperClient.getTabletServers(currentServers);
List<Integer> skippedNullRegistration = new ArrayList<>();
List<Integer> skippedNoEndpoint = new ArrayList<>();
for (int server : currentServers) {
TabletServerRegistration registration = tabletServerRegistrations.get(server);
if (registration == null) {
skippedNullRegistration.add(server);
continue;
}
ServerInfo serverInfo =
new ServerInfo(
server,
Expand All @@ -357,6 +364,7 @@ private void initCoordinatorContext() throws Exception {
"Can not find endpoint for listener name {} for tablet server {}",
internalListenerName,
serverInfo);
skippedNoEndpoint.add(server);
continue;
}
tabletServerInfos.add(serverInfo);
Expand All @@ -370,8 +378,30 @@ private void initCoordinatorContext() throws Exception {

coordinatorContext.setLiveTabletServers(tabletServerInfos);
LOG.info(
"Load tablet servers success in {}ms when initializing coordinator context.",
System.currentTimeMillis() - start4loadTabletServer);
"Load tablet servers success in {}ms when initializing coordinator context. "
+ "ZK returned {} servers, loaded {} into liveSet, "
+ "skipped {} (null registration), skipped {} (no endpoint). "
+ "Live server IDs: {}",
System.currentTimeMillis() - start4loadTabletServer,
currentServers.length,
tabletServerInfos.size(),
skippedNullRegistration.size(),
skippedNoEndpoint.size(),
tabletServerInfos.stream()
.map(s -> String.valueOf(s.id()))
.collect(Collectors.joining(",")));
if (!skippedNullRegistration.isEmpty()) {
LOG.warn(
"Skipped {} servers with null ZK registration: {}",
skippedNullRegistration.size(),
skippedNullRegistration);
}
if (!skippedNoEndpoint.isEmpty()) {
LOG.warn(
"Skipped {} servers with no internal endpoint: {}",
skippedNoEndpoint.size(),
skippedNoEndpoint);
}

// init tablet server channels
coordinatorChannelManager.startup(internalServerNodes);
Expand Down Expand Up @@ -938,11 +968,25 @@ private void processNotifyLeaderAndIsrResponseReceivedEvent(
notifyLeaderAndIsrResponseReceivedEvent.getNotifyLeaderAndIsrResultForBuckets();
for (NotifyLeaderAndIsrResultForBucket notifyLeaderAndIsrResultForBucket :
notifyLeaderAndIsrResultForBuckets) {
// if the error code is not none, we will consider it as offline
if (notifyLeaderAndIsrResultForBucket.failed()) {
offlineReplicas.add(
new TableBucketReplica(
notifyLeaderAndIsrResultForBucket.getTableBucket(), serverId));
Errors error = notifyLeaderAndIsrResultForBucket.getError().error();
TableBucket tableBucket = notifyLeaderAndIsrResultForBucket.getTableBucket();
if (isFatalReplicaError(error)) {
LOG.warn(
"Fatal NotifyLeaderAndIsr error for bucket {} on server {}: {}. "
+ "Marking replica offline.",
tableBucket,
serverId,
notifyLeaderAndIsrResultForBucket.getError());
offlineReplicas.add(new TableBucketReplica(tableBucket, serverId));
} else {
LOG.warn(
"Transient NotifyLeaderAndIsr error for bucket {} on server {}: {}. "
+ "Replica remains online.",
tableBucket,
serverId,
notifyLeaderAndIsrResultForBucket.getError());
}
}
}
if (!offlineReplicas.isEmpty()) {
Expand All @@ -951,6 +995,18 @@ private void processNotifyLeaderAndIsrResponseReceivedEvent(
}
}

/**
* Returns true if the error indicates a fatal replica failure (storage corruption, unknown
* internal error) that warrants excluding this replica from future leader elections. All other
* errors are considered transient and should NOT mark the replica offline.
*/
private static boolean isFatalReplicaError(Errors error) {
return error == Errors.STORAGE_EXCEPTION
|| error == Errors.LOG_STORAGE_EXCEPTION
|| error == Errors.KV_STORAGE_EXCEPTION
|| error == Errors.UNKNOWN_SERVER_ERROR;
}

private void onReplicaBecomeOffline(Set<TableBucketReplica> offlineReplicas) {
LOG.info("The replica {} become offline.", offlineReplicas);
for (TableBucketReplica offlineReplica : offlineReplicas) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.fluss.exception.InvalidColumnProjectionException;
import org.apache.fluss.exception.InvalidCoordinatorException;
import org.apache.fluss.exception.InvalidRequiredAcksException;
import org.apache.fluss.exception.LeaderNotAvailableException;
import org.apache.fluss.exception.LogOffsetOutOfRangeException;
import org.apache.fluss.exception.LogStorageException;
import org.apache.fluss.exception.NotLeaderOrFollowerException;
Expand Down Expand Up @@ -1188,17 +1189,16 @@ private void addFetcherForReplicas(
Integer leaderId = replica.getLeaderId();
TableBucket tb = replica.getTableBucket();
LogTablet logTablet = replica.getLogTablet();
if (leaderId == null) {
if (leaderId == null || leaderId < 0) {
result.put(
tb,
new NotifyLeaderAndIsrResultForBucket(
tb,
ApiError.fromThrowable(
new StorageException(
new LeaderNotAvailableException(
String.format(
"Could not find leader for follower replica %s while make "
+ "follower for %s.",
serverId, tb)))));
"No leader available for follower replica %s on server %s.",
tb, serverId)))));
} else {
bucketAndStatus.put(
tb,
Expand Down
Loading
Loading