diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index 920a490daa2a..65f3843ef26d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -27,7 +27,6 @@ import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; @@ -231,16 +230,25 @@ public Collection getUnneededFiles(long maxTs, List file ImmutableList files = storeFiles.all; // 1) We can never get rid of the last file which has the maximum seqid. // 2) Files that are not the latest can't become one due to (1), so the rest are fair game. - return files.stream().limit(Math.max(0, files.size() - 1)).filter(sf -> { - long fileTs = sf.getReader().getMaxTimestamp(); + List expiredStoreFiles = new ArrayList<>(); + for (int i = 0; i < files.size() - 1; i++) { + HStoreFile sf = files.get(i); + StoreFileReader reader = sf.getReader(); + if (reader == null) { + LOG.debug( + "Skipping store file {} with sequenceId {} while collecting unneeded files because its " + + "reader is null; expiration cutoff={}", + sf.getPath(), sf.getMaxSequenceId(), maxTs); + continue; + } + long fileTs = reader.getMaxTimestamp(); if (fileTs < maxTs && !filesCompacting.contains(sf)) { LOG.info("Found an expired store file {} whose maxTimestamp is {}, which is below {}", sf.getPath(), fileTs, maxTs); - return true; - } else { - return false; + expiredStoreFiles.add(sf); } - }).collect(Collectors.toList()); + } + return expiredStoreFiles; } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java index d83107a10b9a..6bf77d09f0ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java @@ -25,6 +25,7 @@ import java.util.Random; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreFileReader; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -50,7 +51,8 @@ public SortedCompactionPolicy(Configuration conf, StoreConfigInformation storeCo public List preSelectCompactionForCoprocessor(Collection candidates, List filesCompacting) { - return getCurrentEligibleFiles(new ArrayList<>(candidates), filesCompacting); + return filterNullReaderFiles(getCurrentEligibleFiles(new ArrayList<>(candidates), + filesCompacting), "pre-selecting compaction"); } /** @@ -72,6 +74,7 @@ public CompactionRequestImpl selectCompaction(Collection candidateFi >= storeConfigInfo.getBlockingFileCount(); candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting); + candidateSelection = filterNullReaderFiles(candidateSelection, "selecting compaction"); LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " + filesCompacting.size() + " compacting, " + candidateSelection.size() + " eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking"); @@ -175,6 +178,20 @@ protected ArrayList getCurrentEligibleFiles(ArrayList ca return candidateFiles; } + protected ArrayList filterNullReaderFiles(ArrayList candidates, + String context) { + candidates.removeIf(candidate -> { + StoreFileReader reader = candidate.getReader(); + if (reader != null) { + return false; + } + LOG.debug("Skipping store file {} with sequenceId {} while {} because its reader is null", + candidate.getPath(), candidate.getMaxSequenceId(), context); + return true; + }); + return candidates; + } + /** * @param candidates pre-filtrate * @return filtered subset exclude all files above maxCompactSize Also save all references. We diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreFileManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreFileManager.java new file mode 100644 index 000000000000..c29da806b79e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultStoreFileManager.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.OptionalLong; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.CellComparatorImpl; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDefaultStoreFileManager { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestDefaultStoreFileManager.class); + + @Test + public void testGetUnneededFilesSkipsNullReader() throws IOException { + DefaultStoreFileManager manager = createManager(createStoreFile("null-reader.hfile", 1, null), + createStoreFile("expired.hfile", 2, 5L), createStoreFile("latest.hfile", 3, 50L)); + BlockingDeque logs = new LinkedBlockingDeque<>(); + org.apache.logging.log4j.core.Appender appender = + mock(org.apache.logging.log4j.core.Appender.class); + when(appender.getName()).thenReturn("mockAppender"); + when(appender.isStarted()).thenReturn(true); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) { + org.apache.logging.log4j.core.LogEvent logEvent = + invocation.getArgument(0, org.apache.logging.log4j.core.LogEvent.class); + logs.add(new LevelAndMessage(logEvent.getLevel(), + logEvent.getMessage().getFormattedMessage())); + return null; + } + }).when(appender).append(any(org.apache.logging.log4j.core.LogEvent.class)); + + org.apache.logging.log4j.core.Logger logger = + (org.apache.logging.log4j.core.Logger) org.apache.logging.log4j.LogManager + .getLogger(DefaultStoreFileManager.class); + logger.addAppender(appender); + try { + Collection unneededFiles = manager.getUnneededFiles(10L, Collections.emptyList()); + assertEquals(1, unneededFiles.size()); + assertTrue( + unneededFiles.stream().anyMatch(sf -> "expired.hfile".equals(sf.getPath().getName()))); + assertFalse( + unneededFiles.stream().anyMatch(sf -> "null-reader.hfile".equals(sf.getPath().getName()))); + assertTrue(logs.stream().anyMatch(log -> log.level == org.apache.logging.log4j.Level.DEBUG + && log.message.contains("Skipping store file") + && log.message.contains("null-reader.hfile") + && log.message.contains("reader is null"))); + } finally { + logger.removeAppender(appender); + } + } + + private static DefaultStoreFileManager createManager(HStoreFile... files) throws IOException { + Configuration conf = HBaseConfiguration.create(); + CompactionConfiguration comConf = mock(CompactionConfiguration.class); + DefaultStoreFileManager manager = new DefaultStoreFileManager(CellComparatorImpl.COMPARATOR, + StoreFileComparators.SEQ_ID, conf, comConf); + manager.loadFiles(Arrays.asList(files)); + return manager; + } + + private static HStoreFile createStoreFile(String name, long sequenceId, Long maxTimestamp) { + HStoreFile storeFile = mock(HStoreFile.class); + when(storeFile.getPath()).thenReturn(new Path("/hbase/" + name)); + when(storeFile.getMaxSequenceId()).thenReturn(sequenceId); + when(storeFile.getBulkLoadTimestamp()).thenReturn(OptionalLong.empty()); + if (maxTimestamp == null) { + when(storeFile.getReader()).thenReturn(null); + } else { + StoreFileReader reader = mock(StoreFileReader.class); + when(reader.length()).thenReturn(1L); + when(reader.getMaxTimestamp()).thenReturn(maxTimestamp); + when(storeFile.getReader()).thenReturn(reader); + } + return storeFile; + } + + private static final class LevelAndMessage { + private final org.apache.logging.log4j.Level level; + private final String message; + + private LevelAndMessage(org.apache.logging.log4j.Level level, String message) { + this.level = level; + this.message = message; + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestNullReaderCompactionPolicies.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestNullReaderCompactionPolicies.java new file mode 100644 index 000000000000..353e35ee2a27 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestNullReaderCompactionPolicies.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.compactions; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.regionserver.HStoreFile; +import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; +import org.apache.hadoop.hbase.regionserver.StoreFileReader; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestNullReaderCompactionPolicies { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestNullReaderCompactionPolicies.class); + + @Test + public void testExploringSelectCompactionSkipsNullReaderFiles() throws IOException { + ExploringCompactionPolicy policy = new ExploringCompactionPolicy(createConf(), createStore()); + HStoreFile nullReaderA = createStoreFile("nullA", 1L, null, null, null); + HStoreFile nullReaderB = createStoreFile("nullB", 2L, null, null, null); + HStoreFile nullReaderC = createStoreFile("nullC", 3L, null, null, null); + HStoreFile newerA = createStoreFile("newerA", 4L, 10L, 10L, 1L); + HStoreFile newerB = createStoreFile("newerB", 5L, 10L, 10L, 1L); + HStoreFile newerC = createStoreFile("newerC", 6L, 10L, 10L, 1L); + + CompactionRequestImpl request = policy.selectCompaction( + Arrays.asList(nullReaderA, nullReaderB, nullReaderC, newerA, newerB, newerC), + Collections.emptyList(), false, false, false); + + assertEquals(Arrays.asList(newerA, newerB, newerC), new ArrayList<>(request.getFiles())); + } + + private static Configuration createConf() { + Configuration conf = HBaseConfiguration.create(); + conf.setLong(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, 100L); + conf.setLong(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_SIZE_KEY, 1L); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MIN_KEY, 2); + conf.setInt(CompactionConfiguration.HBASE_HSTORE_COMPACTION_MAX_KEY, 5); + conf.setFloat(CompactionConfiguration.HBASE_HSTORE_COMPACTION_RATIO_KEY, 1.0F); + return conf; + } + + private static StoreConfigInformation createStore() { + StoreConfigInformation store = mock(StoreConfigInformation.class); + when(store.getMemStoreFlushSize()).thenReturn(128L); + when(store.getBlockingFileCount()).thenReturn(10L); + when(store.getStoreFileTtl()).thenReturn(Long.MAX_VALUE); + when(store.getRegionInfo()).thenReturn(HRegionInfo.FIRST_META_REGIONINFO); + when(store.getColumnFamilyName()).thenReturn("cf"); + return store; + } + + private static HStoreFile createStoreFile(String name, long sequenceId, Long length, Long maxTs, + Long entries) { + HStoreFile storeFile = mock(HStoreFile.class); + when(storeFile.getPath()).thenReturn(new Path("/hbase/" + name)); + when(storeFile.getMaxSequenceId()).thenReturn(sequenceId); + when(storeFile.isReference()).thenReturn(false); + when(storeFile.excludeFromMinorCompaction()).thenReturn(false); + if (length == null) { + when(storeFile.getReader()).thenReturn(null); + } else { + StoreFileReader reader = mock(StoreFileReader.class); + when(reader.length()).thenReturn(length); + when(reader.getMaxTimestamp()).thenReturn(maxTs); + when(reader.getEntries()).thenReturn(entries.longValue()); + when(storeFile.getReader()).thenReturn(reader); + } + return storeFile; + } +}