diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/adminCommand/SystemCheck.java b/server/base/src/main/java/org/apache/accumulo/server/util/adminCommand/SystemCheck.java index af043fd4a99..8158fff6e2a 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/adminCommand/SystemCheck.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/adminCommand/SystemCheck.java @@ -33,6 +33,7 @@ import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.util.ServerKeywordExecutable; import org.apache.accumulo.server.util.adminCommand.SystemCheck.CheckCommandOpts; +import org.apache.accumulo.server.util.checkCommand.BulkLoadedFateCheckRunner; import org.apache.accumulo.server.util.checkCommand.CheckRunner; import org.apache.accumulo.server.util.checkCommand.MetadataTableCheckRunner; import org.apache.accumulo.server.util.checkCommand.RootMetadataCheckRunner; @@ -112,6 +113,9 @@ public enum Check { "Checks that files in system tablet metadata exist in DFS", Collections.singletonList(ROOT_TABLE)), USER_FILES(UserFilesCheckRunner::new, "Checks that files in user tablet metadata exist in DFS", + Collections.singletonList(METADATA_TABLE)), + BULK_FATE(BulkLoadedFateCheckRunner::new, + "Checks that loaded columns in tablet metadata reference live fate operations", Collections.singletonList(METADATA_TABLE)); private final Supplier checkRunner; diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/BulkLoadedFateCheckRunner.java b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/BulkLoadedFateCheckRunner.java new file mode 100644 index 00000000000..dfae4585b5d --- /dev/null +++ b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/BulkLoadedFateCheckRunner.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.server.util.checkCommand; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.cli.ServerOpts; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.ReadOnlyFateStore; +import org.apache.accumulo.core.fate.user.UserFateStore; +import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; +import org.apache.accumulo.core.metadata.StoredTabletFile; +import org.apache.accumulo.core.metadata.SystemTables; +import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.server.ServerContext; +import org.apache.accumulo.server.util.adminCommand.SystemCheck.Check; +import org.apache.accumulo.server.util.adminCommand.SystemCheck.CheckStatus; + +public class BulkLoadedFateCheckRunner implements CheckRunner { + private static final Check check = Check.BULK_FATE; + + @Override + public CheckStatus runCheck(ServerContext context, ServerOpts opts, boolean fixFiles) + throws Exception { + CheckStatus status = CheckStatus.OK; + printRunning(); + + log.trace("********** Checking for orphaned bulk-import loaded columns **********"); + + final Set liveFateIds = new HashSet<>(); + try ( + MetaFateStore mfs = + new MetaFateStore<>(context.getZooSession(), null, null); + UserFateStore ufs = + new UserFateStore<>(context, SystemTables.FATE.tableName(), null, null)) { + + mfs.list().map(ReadOnlyFateStore.FateIdStatus::getFateId).forEach(liveFateIds::add); + ufs.list().map(ReadOnlyFateStore.FateIdStatus::getFateId).forEach(liveFateIds::add); + } + + log.trace("Found {} live FATE operations", liveFateIds.size()); + + try ( + TabletsMetadata tabletsMetadata = + TabletsMetadata + .builder(context).scanMetadataTable().fetch(TabletMetadata.ColumnType.LOADED, + TabletMetadata.ColumnType.SELECTED, TabletMetadata.ColumnType.PREV_ROW) + .build()) { + + for (TabletMetadata tablet : tabletsMetadata) { + + // Check loaded columns + Map loaded = tablet.getLoaded(); + for (Map.Entry entry : loaded.entrySet()) { + FateId fateId = entry.getValue(); + if (!liveFateIds.contains(fateId)) { + log.warn( + "Tablet {} has loaded column for file {} referencing dead FATE op {} - " + + "investigate and clean up manually", + tablet.getExtent(), entry.getKey().getMetadataPath(), fateId); + status = CheckStatus.FAILED; + } + } + + // Check selected columns + SelectedFiles selectedFiles = tablet.getSelectedFiles(); + if (selectedFiles != null) { + FateId fateId = selectedFiles.getFateId(); + if (!liveFateIds.contains(fateId)) { + log.warn("Tablet {} has selected column referencing dead FATE op {} - " + + "investigate and clean up manually", tablet.getExtent(), fateId); + status = CheckStatus.FAILED; + } + } + } + } + + printCompleted(status); + return status; + } + + @Override + public Check getCheck() { + return check; + } +} diff --git a/test/src/main/java/org/apache/accumulo/test/SystemCheckIT.java b/test/src/main/java/org/apache/accumulo/test/SystemCheckIT.java index fd20571dadf..f64781d0ad9 100644 --- a/test/src/main/java/org/apache/accumulo/test/SystemCheckIT.java +++ b/test/src/main/java/org/apache/accumulo/test/SystemCheckIT.java @@ -35,6 +35,7 @@ import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.UUID; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -45,15 +46,24 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.zookeeper.ZooUtil; import org.apache.accumulo.core.lock.ServiceLockPaths; +import org.apache.accumulo.core.metadata.ReferencedTabletFile; import org.apache.accumulo.core.metadata.RootTable; import org.apache.accumulo.core.metadata.StoredTabletFile; import org.apache.accumulo.core.metadata.SystemTables; import org.apache.accumulo.core.metadata.TServerInstance; +import org.apache.accumulo.core.metadata.schema.DataFileValue; import org.apache.accumulo.core.metadata.schema.MetadataSchema; import org.apache.accumulo.core.metadata.schema.RootTabletMetadata; +import org.apache.accumulo.core.metadata.schema.SelectedFiles; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.time.SteadyTime; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.log.WalStateManager; import org.apache.accumulo.server.util.adminCommand.SystemCheck; @@ -199,7 +209,8 @@ public void testAdminCheckRunNoCheckFailures() throws Exception { + "Running dummy check ROOT_TABLE\nDummy check ROOT_TABLE completed with status OK\n" + "Running dummy check METADATA_TABLE\nDummy check METADATA_TABLE completed with status OK\n" + "Running dummy check SYSTEM_FILES\nDummy check SYSTEM_FILES completed with status OK\n" - + "Running dummy check USER_FILES\nDummy check USER_FILES completed with status OK"; + + "Running dummy check USER_FILES\nDummy check USER_FILES completed with status OK\n" + + "Running dummy check BULK_FATE\nDummy check BULK_FATE completed with status OK\n"; String expRunSubRunOrder = "Running dummy check ROOT_TABLE\nDummy check ROOT_TABLE completed with status OK\n" + "Running dummy check SYSTEM_FILES\nDummy check SYSTEM_FILES completed with status OK\n" @@ -208,11 +219,11 @@ public void testAdminCheckRunNoCheckFailures() throws Exception { // printed table allowing us to ensure the table only includes what is expected String expRunAllStatusInfo = "-SYSTEM_CONFIG|OKSERVER_CONFIG|OKTABLE_LOCKS|OKROOT_METADATA|OKROOT_TABLE|OK" - + "METADATA_TABLE|OKSYSTEM_FILES|OKUSER_FILES|OK-"; + + "METADATA_TABLE|OKSYSTEM_FILES|OKUSER_FILES|OKBULK_FATE|OK-"; String expRunSubStatusInfo = "-SYSTEM_CONFIG|FILTERED_OUTSERVER_CONFIG|FILTERED_OUTTABLE_LOCKS|FILTERED_OUT" + "ROOT_METADATA|FILTERED_OUTROOT_TABLE|OKMETADATA_TABLE|FILTERED_OUT" - + "SYSTEM_FILES|OKUSER_FILES|OK-"; + + "SYSTEM_FILES|OKUSER_FILES|OKBULK_FATE|FILTERED_OUT-"; assertTrue(out1.contains(expRunAllRunOrder)); assertTrue(out2.contains(expRunAllRunOrder)); @@ -241,10 +252,12 @@ public void testAdminCheckRunNoCheckFailures() throws Exception { public void testAdminCheckRunWithCheckFailures() throws Exception { // tests running checks with some failing - boolean[] rootTableFails = new boolean[] {true, true, true, true, false, true, true, true}; - boolean[] systemConfigFails = new boolean[] {false, true, true, true, true, true, true, true}; + boolean[] rootTableFails = + new boolean[] {true, true, true, true, false, true, true, true, true}; + boolean[] systemConfigFails = + new boolean[] {false, true, true, true, true, true, true, true, true}; boolean[] userFilesAndMetadataTableFails = - new boolean[] {true, true, true, true, true, false, true, false}; + new boolean[] {true, true, true, true, true, false, true, false, true}; // run all checks with ROOT_TABLE failing: only SYSTEM_CONFIG and ROOT_METADATA should pass // the rest should be filtered out as skipped due to dependency failure @@ -269,7 +282,7 @@ public void testAdminCheckRunWithCheckFailures() throws Exception { + "Running dummy check SERVER_CONFIG\nDummy check SERVER_CONFIG completed with status OK\n" + "Running dummy check TABLE_LOCKS\nDummy check TABLE_LOCKS completed with status OK\n" + "Running dummy check ROOT_METADATA\nDummy check ROOT_METADATA completed with status OK\n" - + "Running dummy check ROOT_TABLE\nDummy check ROOT_TABLE completed with status FAILED"; + + "Running dummy check ROOT_TABLE\nDummy check ROOT_TABLE completed with status FAILED\n"; String expRunOrder2 = "Running dummy check SYSTEM_CONFIG\nDummy check SYSTEM_CONFIG completed with status FAILED\n"; String expRunOrder3And4 = @@ -295,15 +308,15 @@ public void testAdminCheckRunWithCheckFailures() throws Exception { String expStatusInfo1 = "-SYSTEM_CONFIG|OKSERVER_CONFIG|OKTABLE_LOCKS|OKROOT_METADATA|OK" + "ROOT_TABLE|FAILEDMETADATA_TABLE|SKIPPED_DEPENDENCY_FAILED" - + "SYSTEM_FILES|SKIPPED_DEPENDENCY_FAILEDUSER_FILES|SKIPPED_DEPENDENCY_FAILED-"; + + "SYSTEM_FILES|SKIPPED_DEPENDENCY_FAILEDUSER_FILES|SKIPPED_DEPENDENCY_FAILEDBULK_FATE|SKIPPED_DEPENDENCY_FAILED-"; String expStatusInfo2 = "-SYSTEM_CONFIG|FAILEDSERVER_CONFIG|SKIPPED_DEPENDENCY_FAILED" + "TABLE_LOCKS|SKIPPED_DEPENDENCY_FAILEDROOT_METADATA|SKIPPED_DEPENDENCY_FAILED" + "ROOT_TABLE|SKIPPED_DEPENDENCY_FAILEDMETADATA_TABLE|SKIPPED_DEPENDENCY_FAILED" - + "SYSTEM_FILES|SKIPPED_DEPENDENCY_FAILEDUSER_FILES|SKIPPED_DEPENDENCY_FAILED-"; + + "SYSTEM_FILES|SKIPPED_DEPENDENCY_FAILEDUSER_FILES|SKIPPED_DEPENDENCY_FAILEDBULK_FATE|SKIPPED_DEPENDENCY_FAILED-"; String expStatusInfo3And4 = "-SYSTEM_CONFIG|OKSERVER_CONFIG|FILTERED_OUTTABLE_LOCKS|FILTERED_OUT" + "ROOT_METADATA|FILTERED_OUTROOT_TABLE|OKMETADATA_TABLE|FILTERED_OUT" - + "SYSTEM_FILES|FILTERED_OUTUSER_FILES|FAILED"; + + "SYSTEM_FILES|FILTERED_OUTUSER_FILES|FAILEDBULK_FATE|FILTERED_OUT-"; assertTrue(out1.contains(expStatusInfo1)); assertTrue(out2.contains(expStatusInfo2)); @@ -640,6 +653,68 @@ public void testServerConfigCheck() throws Exception { // no simple way to test for a failure case } + @Test + public void testBulkFateCheck() throws Exception { + Check bulkFateCheck = Check.BULK_FATE; + String table = getUniqueNames(1)[0]; + + try (AccumuloClient client = Accumulo.newClient().from(getClientProperties()).build()) { + client.tableOperations().create(table); + ReadWriteIT.ingest(client, 10, 10, 10, 0, table); + client.tableOperations().flush(table, null, null, true); + + var p = getCluster().exec(SystemCheck.class, "run", bulkFateCheck.name()); + assertEquals(0, p.getProcess().waitFor()); + String out = p.readStdOut(); + assertTrue(out.contains("Check BULK_FATE completed with status OK")); + assertNoOtherChecksRan(out, false, bulkFateCheck); + + final var context = getCluster().getServerContext(); + final TableId tableId = TableId.of(context.tableOperations().tableIdMap().get(table)); + + KeyExtent firstExtent; + try (var tabletsMetadata = context.getAmple().readTablets().forTable(tableId) + .fetch(TabletMetadata.ColumnType.PREV_ROW).build()) { + firstExtent = tabletsMetadata.iterator().next().getExtent(); + } + + FateId deadLoadedFateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + ReferencedTabletFile fakeFile = ReferencedTabletFile.of(new org.apache.hadoop.fs.Path( + "hdfs://localhost:8020/accumulo/tables/" + tableId + "/t-0000/ghost-bulk.rf")); + + context.getAmple().mutateTablet(firstExtent).putFile(fakeFile, new DataFileValue(100, 10)) + .putBulkFile(fakeFile, deadLoadedFateId).mutate(); + + p = getCluster().exec(SystemCheck.class, "run", bulkFateCheck.name()); + assertEquals(1, p.getProcess().waitFor()); + out = p.readStdOut(); + assertTrue(out.contains("has loaded column for file")); + assertTrue(out.contains("referencing dead FATE op")); + assertTrue(out.contains("investigate and clean up manually")); + assertTrue(out.contains("Check BULK_FATE completed with status FAILED")); + assertNoOtherChecksRan(out, false, bulkFateCheck); + + context.getAmple().mutateTablet(firstExtent).deleteFile(fakeFile.insert()) + .deleteBulkFile(fakeFile.insert()).mutate(); + + FateId deadSelectedFateId = FateId.from(FateInstanceType.USER, UUID.randomUUID()); + StoredTabletFile storedFF = fakeFile.insert(); + SelectedFiles orphanedSelected = new SelectedFiles(Set.of(storedFF), true, deadSelectedFateId, + SteadyTime.from(1, java.util.concurrent.TimeUnit.NANOSECONDS)); + context.getAmple().mutateTablet(firstExtent).putSelectedFiles(orphanedSelected).mutate(); + + p = getCluster().exec(SystemCheck.class, "run", bulkFateCheck.name()); + assertEquals(1, p.getProcess().waitFor()); + out = p.readStdOut(); + assertTrue(out.contains("has selected column referencing dead FATE op")); + assertTrue(out.contains("investigate and clean up manually")); + assertTrue(out.contains("Check BULK_FATE completed with status FAILED")); + assertNoOtherChecksRan(out, false, bulkFateCheck); + + context.getAmple().mutateTablet(firstExtent).deleteSelectedFiles().mutate(); + } + } + private String executeCheckCommand(String[] checkCmdArgs, boolean[] checksPass) throws Exception { String output; SystemCheck check = createDummyCheckCommand(checksPass); @@ -795,6 +870,17 @@ public Check getCheck() { } } + static class DummyBulkLoadedFateCheckRunner extends DummyCheckRunner { + public DummyBulkLoadedFateCheckRunner(boolean passes) { + super(passes); + } + + @Override + public Check getCheck() { + return Check.BULK_FATE; + } + } + class DummyCheckCommand extends CheckCommandOpts { final Map> checkRunners; @@ -813,6 +899,8 @@ public DummyCheckCommand(boolean[] checksPass) { this.checkRunners.put(Check.SYSTEM_FILES, () -> new DummySystemFilesCheckRunner(checksPass[6])); this.checkRunners.put(Check.USER_FILES, () -> new DummyUserFilesCheckRunner(checksPass[7])); + this.checkRunners.put(Check.BULK_FATE, + () -> new DummyBulkLoadedFateCheckRunner(checksPass[8])); } @Override