-
Notifications
You must be signed in to change notification settings - Fork 479
Add BULK_FATE check to SystemCheck command for orphaned loaded/selected columns #6158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,106 @@ | ||||||||
| /* | ||||||||
| * Licensed to the Apache Software Foundation (ASF) under one | ||||||||
| * or more contributor license agreements. See the NOTICE file | ||||||||
| * distributed with this work for additional information | ||||||||
| * regarding copyright ownership. The ASF licenses this file | ||||||||
| * to you under the Apache License, Version 2.0 (the | ||||||||
| * "License"); you may not use this file except in compliance | ||||||||
| * with the License. You may obtain a copy of the License at | ||||||||
| * | ||||||||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||||||||
| * | ||||||||
| * Unless required by applicable law or agreed to in writing, | ||||||||
| * software distributed under the License is distributed on an | ||||||||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||||||
| * KIND, either express or implied. See the License for the | ||||||||
| * specific language governing permissions and limitations | ||||||||
| * under the License. | ||||||||
| */ | ||||||||
| package org.apache.accumulo.server.util.checkCommand; | ||||||||
|
|
||||||||
| import java.util.HashSet; | ||||||||
| import java.util.Map; | ||||||||
| import java.util.Set; | ||||||||
|
|
||||||||
| import org.apache.accumulo.core.cli.ServerOpts; | ||||||||
| import org.apache.accumulo.core.fate.FateId; | ||||||||
| import org.apache.accumulo.core.fate.ReadOnlyFateStore; | ||||||||
| import org.apache.accumulo.core.fate.user.UserFateStore; | ||||||||
| import org.apache.accumulo.core.fate.zookeeper.MetaFateStore; | ||||||||
| import org.apache.accumulo.core.metadata.StoredTabletFile; | ||||||||
| import org.apache.accumulo.core.metadata.SystemTables; | ||||||||
| import org.apache.accumulo.core.metadata.schema.SelectedFiles; | ||||||||
| import org.apache.accumulo.core.metadata.schema.TabletMetadata; | ||||||||
| import org.apache.accumulo.core.metadata.schema.TabletsMetadata; | ||||||||
| import org.apache.accumulo.server.ServerContext; | ||||||||
| import org.apache.accumulo.server.util.adminCommand.SystemCheck.Check; | ||||||||
| import org.apache.accumulo.server.util.adminCommand.SystemCheck.CheckStatus; | ||||||||
|
|
||||||||
| public class BulkLoadedFateCheckRunner implements CheckRunner { | ||||||||
| private static final Check check = Check.BULK_FATE; | ||||||||
|
|
||||||||
| @Override | ||||||||
| public CheckStatus runCheck(ServerContext context, ServerOpts opts, boolean fixFiles) | ||||||||
| throws Exception { | ||||||||
| CheckStatus status = CheckStatus.OK; | ||||||||
| printRunning(); | ||||||||
|
|
||||||||
| log.trace("********** Checking for orphaned bulk-import loaded columns **********"); | ||||||||
|
|
||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code has race condition where a bulk import may start after creating liveFateIds and before scanning the metadata table. To fix this can do an initial scan of the metadata table here, this scan would be similar to the later scans but it would only collect fate ids. final Set<FateId> initialMetadataIds = new HashSet<>();
// TODO scan metadata and find all fate idsThis set can be used in the second scan of the metadata table to avoid race conditions. |
||||||||
| final Set<FateId> liveFateIds = new HashSet<>(); | ||||||||
| try ( | ||||||||
| MetaFateStore<BulkLoadedFateCheckRunner> mfs = | ||||||||
| new MetaFateStore<>(context.getZooSession(), null, null); | ||||||||
| UserFateStore<BulkLoadedFateCheckRunner> ufs = | ||||||||
| new UserFateStore<>(context, SystemTables.FATE.tableName(), null, null)) { | ||||||||
|
|
||||||||
| mfs.list().map(ReadOnlyFateStore.FateIdStatus::getFateId).forEach(liveFateIds::add); | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is another list method that takes an enum set. Would be better to use that and pass in Also we could filter to the fate operations to only table compactions and bulk imports. This is not needed for correctness, just pull less stuff into memory.
Suggested change
|
||||||||
| ufs.list().map(ReadOnlyFateStore.FateIdStatus::getFateId).forEach(liveFateIds::add); | ||||||||
| } | ||||||||
|
|
||||||||
| log.trace("Found {} live FATE operations", liveFateIds.size()); | ||||||||
|
|
||||||||
| try ( | ||||||||
| TabletsMetadata tabletsMetadata = | ||||||||
| TabletsMetadata | ||||||||
| .builder(context).scanMetadataTable().fetch(TabletMetadata.ColumnType.LOADED, | ||||||||
|
Comment on lines
+65
to
+66
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should use ample off the context. Also need to read from all three data levels. So would need to refactor this code to loop over the enums in context.getAmple().readTablets().forLevel(Ample.DataLevel.USER).fetch(TabletMetadata.ColumnType.LOADED, |
||||||||
| TabletMetadata.ColumnType.SELECTED, TabletMetadata.ColumnType.PREV_ROW) | ||||||||
| .build()) { | ||||||||
|
|
||||||||
| for (TabletMetadata tablet : tabletsMetadata) { | ||||||||
|
|
||||||||
| // Check loaded columns | ||||||||
| Map<StoredTabletFile,FateId> loaded = tablet.getLoaded(); | ||||||||
| for (Map.Entry<StoredTabletFile,FateId> entry : loaded.entrySet()) { | ||||||||
| FateId fateId = entry.getValue(); | ||||||||
| if (!liveFateIds.contains(fateId)) { | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can add this check here and few lines down to avoid the race condition.
Suggested change
|
||||||||
| log.warn( | ||||||||
| "Tablet {} has loaded column for file {} referencing dead FATE op {} - " | ||||||||
| + "investigate and clean up manually", | ||||||||
| tablet.getExtent(), entry.getKey().getMetadataPath(), fateId); | ||||||||
| status = CheckStatus.FAILED; | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| // Check selected columns | ||||||||
| SelectedFiles selectedFiles = tablet.getSelectedFiles(); | ||||||||
| if (selectedFiles != null) { | ||||||||
| FateId fateId = selectedFiles.getFateId(); | ||||||||
| if (!liveFateIds.contains(fateId)) { | ||||||||
| log.warn("Tablet {} has selected column referencing dead FATE op {} - " | ||||||||
| + "investigate and clean up manually", tablet.getExtent(), fateId); | ||||||||
| status = CheckStatus.FAILED; | ||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
| } | ||||||||
|
|
||||||||
| printCompleted(status); | ||||||||
| return status; | ||||||||
| } | ||||||||
|
|
||||||||
| @Override | ||||||||
| public Check getCheck() { | ||||||||
| return check; | ||||||||
| } | ||||||||
| } | ||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is actually finding dead fate ids related compactions and bulk imports, so could make the class name more general.