Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.util.ServerKeywordExecutable;
import org.apache.accumulo.server.util.adminCommand.SystemCheck.CheckCommandOpts;
import org.apache.accumulo.server.util.checkCommand.BulkLoadedFateCheckRunner;
import org.apache.accumulo.server.util.checkCommand.CheckRunner;
import org.apache.accumulo.server.util.checkCommand.MetadataTableCheckRunner;
import org.apache.accumulo.server.util.checkCommand.RootMetadataCheckRunner;
Expand Down Expand Up @@ -112,6 +113,9 @@ public enum Check {
"Checks that files in system tablet metadata exist in DFS",
Collections.singletonList(ROOT_TABLE)),
USER_FILES(UserFilesCheckRunner::new, "Checks that files in user tablet metadata exist in DFS",
Collections.singletonList(METADATA_TABLE)),
BULK_FATE(BulkLoadedFateCheckRunner::new,
"Checks that loaded columns in tablet metadata reference live fate operations",
Collections.singletonList(METADATA_TABLE));

private final Supplier<CheckRunner> checkRunner;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.server.util.checkCommand;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.accumulo.core.cli.ServerOpts;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SystemTables;
import org.apache.accumulo.core.metadata.schema.SelectedFiles;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.util.adminCommand.SystemCheck.Check;
import org.apache.accumulo.server.util.adminCommand.SystemCheck.CheckStatus;

public class BulkLoadedFateCheckRunner implements CheckRunner {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is actually finding dead fate ids related compactions and bulk imports, so could make the class name more general.

private static final Check check = Check.BULK_FATE;

@Override
public CheckStatus runCheck(ServerContext context, ServerOpts opts, boolean fixFiles)
throws Exception {
CheckStatus status = CheckStatus.OK;
printRunning();

log.trace("********** Checking for orphaned bulk-import loaded columns **********");

Copy link
Contributor

@keith-turner keith-turner Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code has race condition where a bulk import may start after creating liveFateIds and before scanning the metadata table. To fix this can do an initial scan of the metadata table here, this scan would be similar to the later scans but it would only collect fate ids.

final Set<FateId> initialMetadataIds = new HashSet<>();
// TODO scan metadata and find all fate ids

This set can be used in the second scan of the metadata table to avoid race conditions.

final Set<FateId> liveFateIds = new HashSet<>();
try (
MetaFateStore<BulkLoadedFateCheckRunner> mfs =
new MetaFateStore<>(context.getZooSession(), null, null);
UserFateStore<BulkLoadedFateCheckRunner> ufs =
new UserFateStore<>(context, SystemTables.FATE.tableName(), null, null)) {

mfs.list().map(ReadOnlyFateStore.FateIdStatus::getFateId).forEach(liveFateIds::add);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another list method that takes an enum set. Would be better to use that and pass in EnumSet.of(IN_PROGRESS,FAILED_IN_PROGRESS,SUBMITTED) to only see things that are currently running or about to run. Would not expect to see a finished fate operation to have an id in the metadata table.

Also we could filter to the fate operations to only table compactions and bulk imports. This is not needed for correctness, just pull less stuff into memory.

Suggested change
mfs.list().map(ReadOnlyFateStore.FateIdStatus::getFateId).forEach(liveFateIds::add);
mfs.list(EnumSet.of(IN_PROGRESS,FAILED_IN_PROGRESS,SUBMITTED)).filter(// TODO filter on operations of type TABLE_COMPACT or BULK_IMPORT) map(ReadOnlyFateStore.FateIdStatus::getFateId).forEach(liveFateIds::add);

ufs.list().map(ReadOnlyFateStore.FateIdStatus::getFateId).forEach(liveFateIds::add);
}

log.trace("Found {} live FATE operations", liveFateIds.size());

try (
TabletsMetadata tabletsMetadata =
TabletsMetadata
.builder(context).scanMetadataTable().fetch(TabletMetadata.ColumnType.LOADED,
Comment on lines +65 to +66
Copy link
Contributor

@keith-turner keith-turner Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use ample off the context. Also need to read from all three data levels. So would need to refactor this code to loop over the enums in Ample.DataLevel and scan the metadata table for each level.

context.getAmple().readTablets().forLevel(Ample.DataLevel.USER).fetch(TabletMetadata.ColumnType.LOADED,

TabletMetadata.ColumnType.SELECTED, TabletMetadata.ColumnType.PREV_ROW)
.build()) {

for (TabletMetadata tablet : tabletsMetadata) {

// Check loaded columns
Map<StoredTabletFile,FateId> loaded = tablet.getLoaded();
for (Map.Entry<StoredTabletFile,FateId> entry : loaded.entrySet()) {
FateId fateId = entry.getValue();
if (!liveFateIds.contains(fateId)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can add this check here and few lines down to avoid the race condition.

Suggested change
if (!liveFateIds.contains(fateId)) {
// The fate id exist in the metadata table before and after scanning the fate table, so it must be a dead fateId.
if (!liveFateIds.contains(fateId) && initialMetadataIds.contains(fateId)) {

log.warn(
"Tablet {} has loaded column for file {} referencing dead FATE op {} - "
+ "investigate and clean up manually",
tablet.getExtent(), entry.getKey().getMetadataPath(), fateId);
status = CheckStatus.FAILED;
}
}

// Check selected columns
SelectedFiles selectedFiles = tablet.getSelectedFiles();
if (selectedFiles != null) {
FateId fateId = selectedFiles.getFateId();
if (!liveFateIds.contains(fateId)) {
log.warn("Tablet {} has selected column referencing dead FATE op {} - "
+ "investigate and clean up manually", tablet.getExtent(), fateId);
status = CheckStatus.FAILED;
}
}
}
}

printCompleted(status);
return status;
}

@Override
public Check getCheck() {
return check;
}
}
108 changes: 98 additions & 10 deletions test/src/main/java/org/apache/accumulo/test/SystemCheckIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Supplier;
import java.util.regex.Pattern;

Expand All @@ -45,15 +46,24 @@
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.lock.ServiceLockPaths;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SystemTables;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.RootTabletMetadata;
import org.apache.accumulo.core.metadata.schema.SelectedFiles;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.time.SteadyTime;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.util.adminCommand.SystemCheck;
Expand Down Expand Up @@ -199,7 +209,8 @@ public void testAdminCheckRunNoCheckFailures() throws Exception {
+ "Running dummy check ROOT_TABLE\nDummy check ROOT_TABLE completed with status OK\n"
+ "Running dummy check METADATA_TABLE\nDummy check METADATA_TABLE completed with status OK\n"
+ "Running dummy check SYSTEM_FILES\nDummy check SYSTEM_FILES completed with status OK\n"
+ "Running dummy check USER_FILES\nDummy check USER_FILES completed with status OK";
+ "Running dummy check USER_FILES\nDummy check USER_FILES completed with status OK\n"
+ "Running dummy check BULK_FATE\nDummy check BULK_FATE completed with status OK\n";
String expRunSubRunOrder =
"Running dummy check ROOT_TABLE\nDummy check ROOT_TABLE completed with status OK\n"
+ "Running dummy check SYSTEM_FILES\nDummy check SYSTEM_FILES completed with status OK\n"
Expand All @@ -208,11 +219,11 @@ public void testAdminCheckRunNoCheckFailures() throws Exception {
// printed table allowing us to ensure the table only includes what is expected
String expRunAllStatusInfo =
"-SYSTEM_CONFIG|OKSERVER_CONFIG|OKTABLE_LOCKS|OKROOT_METADATA|OKROOT_TABLE|OK"
+ "METADATA_TABLE|OKSYSTEM_FILES|OKUSER_FILES|OK-";
+ "METADATA_TABLE|OKSYSTEM_FILES|OKUSER_FILES|OKBULK_FATE|OK-";
String expRunSubStatusInfo =
"-SYSTEM_CONFIG|FILTERED_OUTSERVER_CONFIG|FILTERED_OUTTABLE_LOCKS|FILTERED_OUT"
+ "ROOT_METADATA|FILTERED_OUTROOT_TABLE|OKMETADATA_TABLE|FILTERED_OUT"
+ "SYSTEM_FILES|OKUSER_FILES|OK-";
+ "SYSTEM_FILES|OKUSER_FILES|OKBULK_FATE|FILTERED_OUT-";

assertTrue(out1.contains(expRunAllRunOrder));
assertTrue(out2.contains(expRunAllRunOrder));
Expand Down Expand Up @@ -241,10 +252,12 @@ public void testAdminCheckRunNoCheckFailures() throws Exception {
public void testAdminCheckRunWithCheckFailures() throws Exception {
// tests running checks with some failing

boolean[] rootTableFails = new boolean[] {true, true, true, true, false, true, true, true};
boolean[] systemConfigFails = new boolean[] {false, true, true, true, true, true, true, true};
boolean[] rootTableFails =
new boolean[] {true, true, true, true, false, true, true, true, true};
boolean[] systemConfigFails =
new boolean[] {false, true, true, true, true, true, true, true, true};
boolean[] userFilesAndMetadataTableFails =
new boolean[] {true, true, true, true, true, false, true, false};
new boolean[] {true, true, true, true, true, false, true, false, true};

// run all checks with ROOT_TABLE failing: only SYSTEM_CONFIG and ROOT_METADATA should pass
// the rest should be filtered out as skipped due to dependency failure
Expand All @@ -269,7 +282,7 @@ public void testAdminCheckRunWithCheckFailures() throws Exception {
+ "Running dummy check SERVER_CONFIG\nDummy check SERVER_CONFIG completed with status OK\n"
+ "Running dummy check TABLE_LOCKS\nDummy check TABLE_LOCKS completed with status OK\n"
+ "Running dummy check ROOT_METADATA\nDummy check ROOT_METADATA completed with status OK\n"
+ "Running dummy check ROOT_TABLE\nDummy check ROOT_TABLE completed with status FAILED";
+ "Running dummy check ROOT_TABLE\nDummy check ROOT_TABLE completed with status FAILED\n";
String expRunOrder2 =
"Running dummy check SYSTEM_CONFIG\nDummy check SYSTEM_CONFIG completed with status FAILED\n";
String expRunOrder3And4 =
Expand All @@ -295,15 +308,15 @@ public void testAdminCheckRunWithCheckFailures() throws Exception {

String expStatusInfo1 = "-SYSTEM_CONFIG|OKSERVER_CONFIG|OKTABLE_LOCKS|OKROOT_METADATA|OK"
+ "ROOT_TABLE|FAILEDMETADATA_TABLE|SKIPPED_DEPENDENCY_FAILED"
+ "SYSTEM_FILES|SKIPPED_DEPENDENCY_FAILEDUSER_FILES|SKIPPED_DEPENDENCY_FAILED-";
+ "SYSTEM_FILES|SKIPPED_DEPENDENCY_FAILEDUSER_FILES|SKIPPED_DEPENDENCY_FAILEDBULK_FATE|SKIPPED_DEPENDENCY_FAILED-";
String expStatusInfo2 = "-SYSTEM_CONFIG|FAILEDSERVER_CONFIG|SKIPPED_DEPENDENCY_FAILED"
+ "TABLE_LOCKS|SKIPPED_DEPENDENCY_FAILEDROOT_METADATA|SKIPPED_DEPENDENCY_FAILED"
+ "ROOT_TABLE|SKIPPED_DEPENDENCY_FAILEDMETADATA_TABLE|SKIPPED_DEPENDENCY_FAILED"
+ "SYSTEM_FILES|SKIPPED_DEPENDENCY_FAILEDUSER_FILES|SKIPPED_DEPENDENCY_FAILED-";
+ "SYSTEM_FILES|SKIPPED_DEPENDENCY_FAILEDUSER_FILES|SKIPPED_DEPENDENCY_FAILEDBULK_FATE|SKIPPED_DEPENDENCY_FAILED-";
String expStatusInfo3And4 =
"-SYSTEM_CONFIG|OKSERVER_CONFIG|FILTERED_OUTTABLE_LOCKS|FILTERED_OUT"
+ "ROOT_METADATA|FILTERED_OUTROOT_TABLE|OKMETADATA_TABLE|FILTERED_OUT"
+ "SYSTEM_FILES|FILTERED_OUTUSER_FILES|FAILED";
+ "SYSTEM_FILES|FILTERED_OUTUSER_FILES|FAILEDBULK_FATE|FILTERED_OUT-";

assertTrue(out1.contains(expStatusInfo1));
assertTrue(out2.contains(expStatusInfo2));
Expand Down Expand Up @@ -640,6 +653,68 @@ public void testServerConfigCheck() throws Exception {
// no simple way to test for a failure case
}

@Test
public void testBulkFateCheck() throws Exception {
Check bulkFateCheck = Check.BULK_FATE;
String table = getUniqueNames(1)[0];

try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) {
client.tableOperations().create(table);
ReadWriteIT.ingest(client, 10, 10, 10, 0, table);
client.tableOperations().flush(table, null, null, true);

var p = getCluster().exec(SystemCheck.class, "run", bulkFateCheck.name());
assertEquals(0, p.getProcess().waitFor());
String out = p.readStdOut();
assertTrue(out.contains("Check BULK_FATE completed with status OK"));
assertNoOtherChecksRan(out, false, bulkFateCheck);

final var context = getCluster().getServerContext();
final TableId tableId = TableId.of(context.tableOperations().tableIdMap().get(table));

KeyExtent firstExtent;
try (var tabletsMetadata = context.getAmple().readTablets().forTable(tableId)
.fetch(TabletMetadata.ColumnType.PREV_ROW).build()) {
firstExtent = tabletsMetadata.iterator().next().getExtent();
}

FateId deadLoadedFateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
ReferencedTabletFile fakeFile = ReferencedTabletFile.of(new org.apache.hadoop.fs.Path(
"hdfs://localhost:8020/accumulo/tables/" + tableId + "/t-0000/ghost-bulk.rf"));

context.getAmple().mutateTablet(firstExtent).putFile(fakeFile, new DataFileValue(100, 10))
.putBulkFile(fakeFile, deadLoadedFateId).mutate();

p = getCluster().exec(SystemCheck.class, "run", bulkFateCheck.name());
assertEquals(1, p.getProcess().waitFor());
out = p.readStdOut();
assertTrue(out.contains("has loaded column for file"));
assertTrue(out.contains("referencing dead FATE op"));
assertTrue(out.contains("investigate and clean up manually"));
assertTrue(out.contains("Check BULK_FATE completed with status FAILED"));
assertNoOtherChecksRan(out, false, bulkFateCheck);

context.getAmple().mutateTablet(firstExtent).deleteFile(fakeFile.insert())
.deleteBulkFile(fakeFile.insert()).mutate();

FateId deadSelectedFateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
StoredTabletFile storedFF = fakeFile.insert();
SelectedFiles orphanedSelected = new SelectedFiles(Set.of(storedFF), true, deadSelectedFateId,
SteadyTime.from(1, java.util.concurrent.TimeUnit.NANOSECONDS));
context.getAmple().mutateTablet(firstExtent).putSelectedFiles(orphanedSelected).mutate();

p = getCluster().exec(SystemCheck.class, "run", bulkFateCheck.name());
assertEquals(1, p.getProcess().waitFor());
out = p.readStdOut();
assertTrue(out.contains("has selected column referencing dead FATE op"));
assertTrue(out.contains("investigate and clean up manually"));
assertTrue(out.contains("Check BULK_FATE completed with status FAILED"));
assertNoOtherChecksRan(out, false, bulkFateCheck);

context.getAmple().mutateTablet(firstExtent).deleteSelectedFiles().mutate();
}
}

private String executeCheckCommand(String[] checkCmdArgs, boolean[] checksPass) throws Exception {
String output;
SystemCheck check = createDummyCheckCommand(checksPass);
Expand Down Expand Up @@ -795,6 +870,17 @@ public Check getCheck() {
}
}

static class DummyBulkLoadedFateCheckRunner extends DummyCheckRunner {
public DummyBulkLoadedFateCheckRunner(boolean passes) {
super(passes);
}

@Override
public Check getCheck() {
return Check.BULK_FATE;
}
}

class DummyCheckCommand extends CheckCommandOpts {
final Map<Check,Supplier<CheckRunner>> checkRunners;

Expand All @@ -813,6 +899,8 @@ public DummyCheckCommand(boolean[] checksPass) {
this.checkRunners.put(Check.SYSTEM_FILES,
() -> new DummySystemFilesCheckRunner(checksPass[6]));
this.checkRunners.put(Check.USER_FILES, () -> new DummyUserFilesCheckRunner(checksPass[7]));
this.checkRunners.put(Check.BULK_FATE,
() -> new DummyBulkLoadedFateCheckRunner(checksPass[8]));
}

@Override
Expand Down