From ed548428d336f1036cf956435bd67b9057806193 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Tue, 24 Feb 2026 18:10:07 +0000 Subject: [PATCH 01/13] Enable clients to scan offline tables using ScanServers During a normal client scan the TabletLocator resolves tablets (key extent and location) for a given search range. The location is necessary for the client to be able to create a connection with a tablet server to perform the scan, but the location is not needed when the client is using scan servers. The TabletLocator does not resolve tablets for offline tables. This change introduces the OfflineTabletLocatorImpl that performs this resolution (range -> key extents) and does not provide any location information. This change also modifies the client to allow scans on offline tables when using scan servers and uses the new OfflineTabletLocatorImpl in that code path. --- .../core/clientImpl/ClientContext.java | 7 +- .../clientImpl/OfflineTabletLocatorImpl.java | 179 ++++++++++++++++ .../accumulo/core/clientImpl/ScannerImpl.java | 10 + .../core/clientImpl/TabletLocator.java | 40 ++-- .../clientImpl/TabletServerBatchReader.java | 4 + .../java/org/apache/accumulo/gc/GCRun.java | 4 +- .../gc/GarbageCollectWriteAheadLogs.java | 4 +- .../test/ScanServerAllowedTablesIT.java | 4 +- .../apache/accumulo/test/ScanServerIT.java | 21 +- .../test/ScanServerOfflineTableIT.java | 194 ++++++++++++++++++ .../apache/accumulo/test/VerifyIngest.java | 9 + .../accumulo/test/functional/ReadWriteIT.java | 14 +- 12 files changed, 443 insertions(+), 47 deletions(-) create mode 100644 core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java create mode 100644 test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java index e5b059a80f7..7dc37c35452 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java @@ -710,8 +710,8 @@ public BatchScanner createBatchScanner(String tableName, Authorizations authoriz int numQueryThreads) throws TableNotFoundException { ensureOpen(); checkArgument(authorizations != null, "authorizations is null"); - return new TabletServerBatchReader(this, requireNotOffline(getTableId(tableName), tableName), - tableName, authorizations, numQueryThreads); + return new TabletServerBatchReader(this, getTableId(tableName), tableName, authorizations, + numQueryThreads); } @Override @@ -796,8 +796,7 @@ public Scanner createScanner(String tableName, Authorizations authorizations) throws TableNotFoundException { ensureOpen(); checkArgument(authorizations != null, "authorizations is null"); - Scanner scanner = - new ScannerImpl(this, requireNotOffline(getTableId(tableName), tableName), authorizations); + Scanner scanner = new ScannerImpl(this, getTableId(tableName), authorizations); Integer batchSize = ClientProperty.SCANNER_BATCH_SIZE.getInteger(getProperties()); if (batchSize != null) { scanner.setBatchSize(batchSize); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java new file mode 100644 index 00000000000..597d45a7de1 --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.core.clientImpl; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.PartialKey; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; +import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; +import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.hadoop.io.Text; + +public class OfflineTabletLocatorImpl extends TabletLocator { + + public static class OfflineTabletLocation extends TabletLocation { + + public static final String SERVER = "offline_table_marker"; + + public OfflineTabletLocation(KeyExtent tablet_extent) { + super(tablet_extent, SERVER, SERVER); + } + + } + + private final TableId tid; + private final TreeSet extents = new TreeSet<>(); + + public OfflineTabletLocatorImpl(ClientContext context, TableId tableId) { + tid = tableId; + if (context.getTableState(tid) != TableState.OFFLINE) { + throw new IllegalStateException("Table " + tableId + " is not offline"); + } + } + + private synchronized void populateExtents(ClientContext context) { + if (extents.size() > 0) { + return; + } + try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tid) + .fetch(ColumnType.PREV_ROW, ColumnType.LOCATION).build()) { + tm.forEach(t -> { + KeyExtent ke = t.getExtent(); + Location loc = t.getLocation(); + if (loc != null && loc.getType() != LocationType.LAST) { + throw new IllegalStateException("Extent has current or future location: " + ke); + } + extents.add(ke); + }); + } + } + + @Override + public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, + boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + populateExtents(context); + + if (skipRow) { + row = new Text(row); + row.append(new byte[] {0}, 0, 1); + } + + Text metadataRow = new Text(tid.canonical()); + metadataRow.append(new byte[] {';'}, 0, 1); + metadataRow.append(row.getBytes(), 0, row.getLength()); + + Set results = KeyExtent.findOverlapping(KeyExtent.fromMetaRow(metadataRow), extents); + if (results.isEmpty()) { + // nothing found + return null; + } else { + // return the 1st (lowest) KeyExtent + return new OfflineTabletLocation(results.iterator().next()); + } + } + + @Override + public List binRanges(ClientContext context, List ranges, + Map>> binnedRanges) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + + populateExtents(context); + List tabletLocations = new ArrayList<>(ranges.size()); + List failures = new ArrayList<>(); + + l1: for (Range r : ranges) { + tabletLocations.clear(); + Text startRow; + + if (r.getStartKey() != null) { + startRow = r.getStartKey().getRow(); + } else { + startRow = new Text(); + } + + TabletLocation tl = this.locateTablet(context, startRow, false, false); + if (tl == null) { + failures.add(r); + continue; + } + tabletLocations.add(tl); + + while (tl.tablet_extent.endRow() != null + && !r.afterEndKey(new Key(tl.tablet_extent.endRow()).followingKey(PartialKey.ROW))) { + tl = locateTablet(context, tl.tablet_extent.endRow(), false, false); + + if (tl == null) { + failures.add(r); + continue l1; + } + tabletLocations.add(tl); + } + + // Ensure the extents found are non overlapping and have no holes. When reading some extents + // from the cache and other from the metadata table in the loop above we may end up with + // non-contiguous extents. This can happen when a subset of exents are placed in the cache and + // then after that merges and splits happen. + if (TabletLocatorImpl.isContiguous(tabletLocations)) { + for (TabletLocation tl2 : tabletLocations) { + TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location, tl2.tablet_extent, r); + } + } else { + failures.add(r); + } + + } + return failures; + } + + @Override + public void binMutations(ClientContext context, List mutations, + Map> binnedMutations, List failures) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + throw new UnsupportedOperationException(); + } + + @Override + public void invalidateCache(KeyExtent failedExtent) {} + + @Override + public void invalidateCache(Collection keySet) {} + + @Override + public void invalidateCache() {} + + @Override + public void invalidateCache(ClientContext context, String server) {} + +} diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java index b4dfaa6258d..0fef7ecaab0 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerImpl.java @@ -29,6 +29,7 @@ import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; @@ -163,6 +164,15 @@ public synchronized int getBatchSize() { @Override public synchronized Iterator> iterator() { ensureOpen(); + if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) { + try { + String tableName = context.getTableName(tableId); + context.requireNotOffline(tableId, tableName); + } catch (TableNotFoundException e) { + throw new RuntimeException("Table not found", e); + } + } + ScannerIterator iter = new ScannerIterator(context, tableId, authorizations, range, size, Duration.ofMillis(getTimeout(MILLISECONDS)), this, isolated, readaheadThreshold, new Reporter()); diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java index c4c1dcdcd9f..b1a4557d7f9 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java @@ -34,6 +34,7 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; +import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.MetadataLocationObtainer; import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.metadata.RootTable; @@ -110,6 +111,7 @@ public boolean equals(LocatorKey lk) { } private static final HashMap locators = new HashMap<>(); + private static final HashMap offlineLocators = new HashMap<>(); private static boolean enabled = true; public static synchronized void clearLocators() { @@ -117,6 +119,7 @@ public static synchronized void clearLocators() { locator.isValid = false; } locators.clear(); + offlineLocators.clear(); } static synchronized boolean isEnabled() { @@ -135,24 +138,31 @@ static synchronized void enable() { public static synchronized TabletLocator getLocator(ClientContext context, TableId tableId) { Preconditions.checkState(enabled, "The Accumulo singleton that that tracks tablet locations is " + "disabled. This is likely caused by all AccumuloClients being closed or garbage collected"); - LocatorKey key = new LocatorKey(context.getInstanceID(), tableId); - TabletLocator tl = locators.get(key); - if (tl == null) { - MetadataLocationObtainer mlo = new MetadataLocationObtainer(); - - if (RootTable.ID.equals(tableId)) { - tl = new RootTabletLocator(context.getTServerLockChecker()); - } else if (MetadataTable.ID.equals(tableId)) { - tl = new TabletLocatorImpl(MetadataTable.ID, getLocator(context, RootTable.ID), mlo, - context.getTServerLockChecker()); - } else { - tl = new TabletLocatorImpl(tableId, getLocator(context, MetadataTable.ID), mlo, - context.getTServerLockChecker()); + TableState state = context.getTableState(tableId); + if (state == TableState.OFFLINE) { + return offlineLocators.computeIfAbsent(tableId, + f -> new OfflineTabletLocatorImpl(context, tableId)); + } else { + offlineLocators.remove(tableId); + LocatorKey key = new LocatorKey(context.getInstanceID(), tableId); + TabletLocator tl = locators.get(key); + if (tl == null) { + MetadataLocationObtainer mlo = new MetadataLocationObtainer(); + + if (RootTable.ID.equals(tableId)) { + tl = new RootTabletLocator(context.getTServerLockChecker()); + } else if (MetadataTable.ID.equals(tableId)) { + tl = new TabletLocatorImpl(MetadataTable.ID, getLocator(context, RootTable.ID), mlo, + context.getTServerLockChecker()); + } else { + tl = new TabletLocatorImpl(tableId, getLocator(context, MetadataTable.ID), mlo, + context.getTServerLockChecker()); + } + locators.put(key, tl); } - locators.put(key, tl); + return tl; } - return tl; } static { diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java index 8b149da57fa..cb03b0b4d89 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java @@ -118,6 +118,10 @@ public Iterator> iterator() { throw new IllegalStateException("batch reader closed"); } + if (getConsistencyLevel() == ConsistencyLevel.IMMEDIATE) { + context.requireNotOffline(tableId, tableName); + } + return new TabletServerBatchReaderIterator(context, tableId, tableName, authorizations, ranges, numThreads, queryThreadPool, this, retryTimeout); } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java index 22275c9e119..f56d2a21a93 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java @@ -47,6 +47,7 @@ import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; @@ -448,7 +449,8 @@ public Iterator> getReplicationNeededIterat } return Maps.immutableEntry(file, stat); }); - } catch (org.apache.accumulo.core.replication.ReplicationTableOfflineException e) { + } catch (org.apache.accumulo.core.replication.ReplicationTableOfflineException + | TableOfflineException e) { // No elements that we need to preclude return Collections.emptyIterator(); } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java index 4d9c4e745a6..39aec22ca2a 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java @@ -38,6 +38,7 @@ import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Value; @@ -448,7 +449,8 @@ protected int removeReplicationEntries(Map candidates) { candidates.remove(id); log.info("Ignore closed log " + id + " because it is being replicated"); } - } catch (org.apache.accumulo.core.replication.ReplicationTableOfflineException ex) { + } catch (org.apache.accumulo.core.replication.ReplicationTableOfflineException + | TableOfflineException ex) { return candidates.size(); } diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java index e14834cf483..a4039ca36a5 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java @@ -154,8 +154,8 @@ public static enum ScannerType { BATCH_SCANNER, SCANNER; } - private ScannerBase createScanner(AccumuloClient client, ScannerType stype, String tableName) - throws TableNotFoundException { + public static ScannerBase createScanner(AccumuloClient client, ScannerType stype, + String tableName) throws TableNotFoundException { switch (stype) { case BATCH_SCANNER: BatchScanner batchScanner = client.createBatchScanner(tableName, Authorizations.EMPTY); diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java index 77cdc8c6478..b271ae22864 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerIT.java @@ -35,7 +35,6 @@ import org.apache.accumulo.core.client.IteratorSetting; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; -import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.conf.ClientProperty; @@ -146,24 +145,6 @@ public void testBatchScan() throws Exception { } } - @Test - public void testScanOfflineTable() throws Exception { - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - String tableName = getUniqueNames(1)[0]; - - createTableAndIngest(client, tableName, null, 10, 10, "colf"); - client.tableOperations().offline(tableName, true); - - assertThrows(TableOfflineException.class, () -> { - try (Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY)) { - scanner.setRange(new Range()); - scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); - assertEquals(100, Iterables.size(scanner)); - } // when the scanner is closed, all open sessions should be closed - }); - } - } - @Test @Timeout(value = 20) public void testBatchScannerTimeout() throws Exception { @@ -232,7 +213,7 @@ public static int createTableAndIngest(AccumuloClient client, String tableName, */ public static int ingest(AccumuloClient client, String tableName, int rowCount, int colCount, int offset, String colf, boolean shouldFlush) throws Exception { - ReadWriteIT.ingest(client, colCount, rowCount, 50, offset, colf, tableName); + ReadWriteIT.ingest(client, rowCount, colCount, 50, offset, colf, tableName); final int ingestedEntriesCount = colCount * rowCount; diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java new file mode 100644 index 00000000000..b9900f348ae --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.SortedSet; +import java.util.TreeSet; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.clientImpl.TabletLocator; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.ScanServerAllowedTablesIT.ScannerType; +import org.apache.accumulo.test.functional.ReadWriteIT; +import org.apache.hadoop.io.Text; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import com.google.common.collect.Iterables; + +public class ScanServerOfflineTableIT extends SharedMiniClusterBase { + + private static class ScanServerOfflineITConfiguration + implements MiniClusterConfigurationCallback { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, + org.apache.hadoop.conf.Configuration coreSite) { + cfg.setNumScanServers(1); + + // Timeout scan sessions after being idle for 3 seconds + cfg.setProperty(Property.TSERV_SESSION_MAXIDLE, "3s"); + + // Configure the scan server to only have 1 scan executor thread. This means + // that the scan server will run scans serially, not concurrently. + cfg.setProperty(Property.SSERV_SCAN_EXECUTORS_DEFAULT_THREADS, "1"); + } + } + + @BeforeAll + public static void start() throws Exception { + ScanServerOfflineITConfiguration c = new ScanServerOfflineITConfiguration(); + SharedMiniClusterBase.startMiniClusterWithConfig(c); + SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER, + "localhost"); + + String zooRoot = getCluster().getServerContext().getZooKeeperRoot(); + ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter(); + String scanServerRoot = zooRoot + Constants.ZSSERVERS; + + while (zrw.getChildren(scanServerRoot).size() == 0) { + Thread.sleep(500); + } + } + + @AfterAll + public static void stop() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + } + + @ParameterizedTest + @EnumSource(value = ScanServerAllowedTablesIT.ScannerType.class) + public void testSimpleScan(ScannerType stype) throws Exception { + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0] + stype.name(); + + final int ingestedEntryCount = + ScanServerIT.createTableAndIngest(client, tableName, null, 10, 10, "colf"); + client.tableOperations().offline(tableName, true); + + // This isn't necessary, but will ensure that the TabletLocator is cleared + // Invalidate the TabletLocator for the offline table + TabletLocator.getLocator((ClientContext) client, + TableId.of(client.tableOperations().tableIdMap().get(tableName))).invalidateCache(); + + try ( + ScannerBase scanner = ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The scan server scanner should have seen all ingested and flushed entries"); + } // when the scanner is closed, all open sessions should be closed + } + } + + @ParameterizedTest + @EnumSource(value = ScanServerAllowedTablesIT.ScannerType.class) + public void testScan(ScannerType stype) throws Exception { + + final int rows = 500; + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + String tableName = getUniqueNames(1)[0] + stype.name(); + + final int ingestedEntryCount = + ScanServerIT.createTableAndIngest(client, tableName, null, rows, 10, "colf"); + + try ( + ScannerBase scanner = ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The tablet server scanner should have seen all ingested and flushed entries"); + } // when the scanner is closed, all open sessions should be closed + ReadWriteIT.verify(client, rows, 10, 50, 0, tableName); + + client.tableOperations().offline(tableName, true); + + // This isn't necessary, but will ensure that the TabletLocator is cleared + // Invalidate the TabletLocator for the offline table + TabletLocator.getLocator((ClientContext) client, + TableId.of(client.tableOperations().tableIdMap().get(tableName))).invalidateCache(); + + try ( + ScannerBase scanner = ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The scan server scanner should have seen all ingested and flushed entries"); + } // when the scanner is closed, all open sessions should be closed + ReadWriteIT.verifyEventual(client, rows, 10, 50, 0, tableName); + + client.tableOperations().online(tableName, true); + + try ( + ScannerBase scanner = ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The tablet server scanner should have seen all ingested and flushed entries"); + } // when the scanner is closed, all open sessions should be closed + ReadWriteIT.verify(client, rows, 10, 50, 0, tableName); + + // Add some splits to the table + SortedSet splits = new TreeSet<>(); + for (int i = 0; i < rows; i++) { + splits.add(new Text("row_" + String.format("%010d", i))); + } + client.tableOperations().addSplits(tableName, splits); + client.instanceOperations().waitForBalance(); + + try ( + ScannerBase scanner = ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.IMMEDIATE); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The tablet server scanner should have seen all ingested and flushed entries"); + } // when the scanner is closed, all open sessions should be closed + ReadWriteIT.verify(client, rows, 10, 50, 0, tableName); + + client.tableOperations().offline(tableName, true); + + // This isn't necessary, but will ensure that the TabletLocator is cleared + // Invalidate the TabletLocator for the offline table + TabletLocator.getLocator((ClientContext) client, + TableId.of(client.tableOperations().tableIdMap().get(tableName))).invalidateCache(); + + try ( + ScannerBase scanner = ScanServerAllowedTablesIT.createScanner(client, stype, tableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + assertEquals(ingestedEntryCount, Iterables.size(scanner), + "The scan server scanner should have seen all ingested and flushed entries"); + } // when the scanner is closed, all open sessions should be closed + ReadWriteIT.verifyEventual(client, rows, 10, 50, 0, tableName); + + } + + } + +} diff --git a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java index 092ea14a750..527ac4a95f1 100644 --- a/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java +++ b/test/src/main/java/org/apache/accumulo/test/VerifyIngest.java @@ -30,6 +30,7 @@ import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.data.Key; @@ -109,6 +110,12 @@ public static void main(String[] args) throws Exception { public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams params) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { + verifyIngest(accumuloClient, params, ConsistencyLevel.IMMEDIATE); + } + + public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams params, + ConsistencyLevel cl) + throws AccumuloException, AccumuloSecurityException, TableNotFoundException { byte[][] bytevals = TestIngest.generateValues(params.dataSize); Authorizations labelAuths = new Authorizations("L1", "L2", "G1", "GROUP2"); @@ -136,6 +143,7 @@ public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams para Text colq = new Text("col_" + String.format("%07d", expectedCol)); try (Scanner scanner = accumuloClient.createScanner("test_ingest", labelAuths)) { + scanner.setConsistencyLevel(cl); scanner.setBatchSize(1); Key startKey = new Key(rowKey, colf, colq); Range range = new Range(startKey, startKey.followingKey(PartialKey.ROW_COLFAM_COLQUAL)); @@ -181,6 +189,7 @@ public static void verifyIngest(AccumuloClient accumuloClient, VerifyParams para Key startKey = new Key(new Text("row_" + String.format("%010d", expectedRow))); try (Scanner scanner = accumuloClient.createScanner(params.tableName, labelAuths)) { + scanner.setConsistencyLevel(cl); scanner.setRange(new Range(startKey, endKey)); for (int j = 0; j < params.cols; j++) { scanner.fetchColumn(new Text(params.columnFamily), diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java index 44eb5b23743..c4cb9574a97 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java @@ -62,6 +62,7 @@ import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.client.admin.NewTableConfiguration; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.clientImpl.ClientContext; @@ -211,18 +212,23 @@ public static void ingest(AccumuloClient accumuloClient, int rows, int cols, int public static void verify(AccumuloClient accumuloClient, int rows, int cols, int width, int offset, String tableName) throws Exception { - verify(accumuloClient, rows, cols, width, offset, COLF, tableName); + verify(accumuloClient, rows, cols, width, offset, COLF, tableName, ConsistencyLevel.IMMEDIATE); + } + + public static void verifyEventual(AccumuloClient accumuloClient, int rows, int cols, int width, + int offset, String tableName) throws Exception { + verify(accumuloClient, rows, cols, width, offset, COLF, tableName, ConsistencyLevel.EVENTUAL); } private static void verify(AccumuloClient accumuloClient, int rows, int cols, int width, - int offset, String colf, String tableName) throws Exception { + int offset, String colf, String tableName, ConsistencyLevel cl) throws Exception { VerifyParams params = new VerifyParams(accumuloClient.properties(), tableName, rows); params.rows = rows; params.dataSize = width; params.startRow = offset; params.columnFamily = colf; params.cols = cols; - VerifyIngest.verifyIngest(accumuloClient, params); + VerifyIngest.verifyIngest(accumuloClient, params, cl); } public static String[] args(String... args) { @@ -445,7 +451,7 @@ public void localityGroupChange() throws Exception { to.setLocalityGroups(table, getGroups(cfg)); to.flush(table, null, null, true); verify(accumuloClient, ROWS * i, 1, 50, 0, table); - verify(accumuloClient, ROWS * i, 1, 50, 0, "xyz", table); + verify(accumuloClient, ROWS * i, 1, 50, 0, "xyz", table, ConsistencyLevel.IMMEDIATE); i++; } } From dd9f6c8b5c3ee0b59fd51a4133132b480450d839 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Thu, 26 Feb 2026 19:14:38 +0000 Subject: [PATCH 02/13] Made Offline tablet cache configurable, fix IT --- .../clientImpl/OfflineTabletLocatorImpl.java | 175 ++++++++++++++---- .../accumulo/core/conf/ClientProperty.java | 18 +- .../test/ScanServerOfflineTableIT.java | 2 +- 3 files changed, 161 insertions(+), 34 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java index 597d45a7de1..1b5bee42927 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java @@ -18,16 +18,24 @@ */ package org.apache.accumulo.core.clientImpl; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableSet; +import java.util.Properties; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.data.Key; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.PartialKey; @@ -35,14 +43,25 @@ import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.manager.state.tables.TableState; +import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Ticker; public class OfflineTabletLocatorImpl extends TabletLocator { + private static final Logger LOG = LoggerFactory.getLogger(OfflineTabletLocatorImpl.class); + public static class OfflineTabletLocation extends TabletLocation { public static final String SERVER = "offline_table_marker"; @@ -53,37 +72,113 @@ public OfflineTabletLocation(KeyExtent tablet_extent) { } + private class OfflineTabletsCache implements RemovalListener { + + // This object uses a Caffeine cache to manage the duration of the extents + // cached in the TreeSet. The TreeSet is necessary for expedient operations. + + private final ClientContext context; + private final int prefetch; + private final Cache cache; + private final Set evictions = new HashSet<>(); + private final NavigableSet extents = + Collections.synchronizedNavigableSet(new TreeSet<>()); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + private OfflineTabletsCache(ClientContext context) { + this.context = context; + Properties clientProperties = ClientConfConverter.toProperties(context.getConfiguration()); + Duration cacheDuration = Duration.ofMillis( + ClientProperty.OFFLINE_LOCATOR_CACHE_DURATION.getTimeInMillis(clientProperties)); + int maxCacheSize = + Integer.parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_SIZE.getValue(clientProperties)); + prefetch = Integer + .parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_PREFETCH.getValue(clientProperties)); + cache = Caffeine.newBuilder().expireAfterAccess(cacheDuration).initialCapacity(maxCacheSize) + .maximumSize(maxCacheSize).evictionListener(this).removalListener(this) + .ticker(Ticker.systemTicker()).build(); + } + + @Override + public void onRemoval(KeyExtent key, KeyExtent value, RemovalCause cause) { + LOG.trace("Extent was evicted from cache: {}", key); + synchronized (evictions) { + evictions.add(key); + } + } + + private KeyExtent findOrLoadExtent(KeyExtent start) { + lock.readLock().lock(); + try { + KeyExtent match = extents.ceiling(start); + if (match != null && match.contains(start)) { + LOG.trace("Extent {} found in cache for start row {}", match, start); + return match; + } + } finally { + lock.readLock().unlock(); + } + lock.writeLock().lock(); + // process prior evictions since we have the write lock + processEvictions(); + // Load TabletMetadata + try (TabletsMetadata tm = + context.getAmple().readTablets().forTable(tid).overlapping(start.endRow(), true, null) + .fetch(ColumnType.PREV_ROW, ColumnType.LOCATION).build()) { + Iterator iter = tm.iterator(); + for (int i = 0; i < prefetch && iter.hasNext(); i++) { + TabletMetadata t = iter.next(); + KeyExtent ke = t.getExtent(); + Location loc = t.getLocation(); + if (loc != null && loc.getType() != LocationType.LAST) { + throw new IllegalStateException("Extent has current or future location: " + ke); + } + LOG.trace("Caching extent: {}", ke); + cache.put(ke, ke); + extents.add(ke); + } + return extents.ceiling(start); + } finally { + lock.writeLock().unlock(); + } + } + + private void processEvictions() { + synchronized (evictions) { + LOG.trace("Processing prior evictions"); + extents.removeAll(evictions); + evictions.clear(); + } + } + + private void invalidate(KeyExtent failedExtent) { + cache.invalidate(failedExtent); + } + + private void invalidate(Collection keySet) { + cache.invalidateAll(keySet); + } + + private void invalidateAll() { + cache.invalidateAll(); + } + + } + private final TableId tid; - private final TreeSet extents = new TreeSet<>(); + private final OfflineTabletsCache extentCache; public OfflineTabletLocatorImpl(ClientContext context, TableId tableId) { tid = tableId; if (context.getTableState(tid) != TableState.OFFLINE) { throw new IllegalStateException("Table " + tableId + " is not offline"); } - } - - private synchronized void populateExtents(ClientContext context) { - if (extents.size() > 0) { - return; - } - try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tid) - .fetch(ColumnType.PREV_ROW, ColumnType.LOCATION).build()) { - tm.forEach(t -> { - KeyExtent ke = t.getExtent(); - Location loc = t.getLocation(); - if (loc != null && loc.getType() != LocationType.LAST) { - throw new IllegalStateException("Extent has current or future location: " + ke); - } - extents.add(ke); - }); - } + extentCache = new OfflineTabletsCache(context); } @Override public TabletLocation locateTablet(ClientContext context, Text row, boolean skipRow, boolean retry) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - populateExtents(context); if (skipRow) { row = new Text(row); @@ -94,14 +189,17 @@ public TabletLocation locateTablet(ClientContext context, Text row, boolean skip metadataRow.append(new byte[] {';'}, 0, 1); metadataRow.append(row.getBytes(), 0, row.getLength()); - Set results = KeyExtent.findOverlapping(KeyExtent.fromMetaRow(metadataRow), extents); - if (results.isEmpty()) { - // nothing found - return null; - } else { - // return the 1st (lowest) KeyExtent - return new OfflineTabletLocation(results.iterator().next()); + LOG.trace("Locating offline tablet for row: {}", metadataRow); + KeyExtent start = KeyExtent.fromMetaRow(metadataRow); + KeyExtent match = extentCache.findOrLoadExtent(start); + if (match != null) { + if (match.prevEndRow() == null || match.prevEndRow().compareTo(row) < 0) { + LOG.trace("Found match for row: {}, extent = {}", row, match); + return new OfflineTabletLocation(match); + } } + LOG.trace("Found no matching extent for row: {}", row); + return null; } @Override @@ -109,11 +207,11 @@ public List binRanges(ClientContext context, List ranges, Map>> binnedRanges) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { - populateExtents(context); List tabletLocations = new ArrayList<>(ranges.size()); List failures = new ArrayList<>(); l1: for (Range r : ranges) { + LOG.trace("Looking up locations for range: {}", r); tabletLocations.clear(); Text startRow; @@ -125,19 +223,23 @@ public List binRanges(ClientContext context, List ranges, TabletLocation tl = this.locateTablet(context, startRow, false, false); if (tl == null) { + LOG.trace("NOT FOUND first tablet in range: {}", r); failures.add(r); continue; } + LOG.trace("Found first tablet in range: {}, extent: {}", r, tl.tablet_extent); tabletLocations.add(tl); while (tl.tablet_extent.endRow() != null && !r.afterEndKey(new Key(tl.tablet_extent.endRow()).followingKey(PartialKey.ROW))) { - tl = locateTablet(context, tl.tablet_extent.endRow(), false, false); + tl = locateTablet(context, tl.tablet_extent.endRow(), true, false); if (tl == null) { + LOG.trace("NOT FOUND following tablet in range: {}", r); failures.add(r); continue l1; } + LOG.trace("Found following tablet in range: {}, extent: {}", r, tl.tablet_extent); tabletLocations.add(tl); } @@ -150,6 +252,7 @@ public List binRanges(ClientContext context, List ranges, TabletLocatorImpl.addRange(binnedRanges, tl2.tablet_location, tl2.tablet_extent, r); } } else { + LOG.trace("Found non-contiguous tablet in range: {}", r); failures.add(r); } @@ -165,15 +268,23 @@ public void binMutations(ClientContext context, List mut } @Override - public void invalidateCache(KeyExtent failedExtent) {} + public void invalidateCache(KeyExtent failedExtent) { + extentCache.invalidate(failedExtent); + } @Override - public void invalidateCache(Collection keySet) {} + public void invalidateCache(Collection keySet) { + extentCache.invalidate(keySet); + } @Override - public void invalidateCache() {} + public void invalidateCache() { + extentCache.invalidateAll(); + } @Override - public void invalidateCache(ClientContext context, String server) {} + public void invalidateCache(ClientContext context, String server) { + invalidateCache(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java index 34780c8a06e..b7812c8b623 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java @@ -141,7 +141,23 @@ public enum ClientProperty { "A list of span receiver classes to send trace spans"), @Deprecated(since = "2.1.0", forRemoval = true) TRACE_ZOOKEEPER_PATH("trace.zookeeper.path", Constants.ZTRACERS, PropertyType.PATH, - "The zookeeper node where tracers are registered", "2.0.0", false); + "The zookeeper node where tracers are registered", "2.0.0", false), + + /* + * For use with OfflineTabletLocatorImpl + */ + OFFLINE_LOCATOR_CACHE_DURATION("offline.locator.cache.duration", "10m", PropertyType.TIMEDURATION, + "Amount of time for which offline extent information should be cached in the client. The offline" + + " extent information is used when performing eventual scans on offline tables.", + "2.1.5", false), + OFFLINE_LOCATOR_CACHE_PREFETCH("offline.locator.cache.prefetch", "10", PropertyType.COUNT, + "The number of offline extents that should be pre-loaded into the cache. The offline" + + " extent information is used when performing eventual scans on offline tables.", + "2.1.5", false), + OFFLINE_LOCATOR_CACHE_SIZE("offline.locator.cache.size", "100", PropertyType.COUNT, + "The number of offline extents that should be cached in the client. The offline" + + " extent information is used when performing eventual scans on offline tables.", + "2.1.5", false); @Deprecated(since = "2.1.0", forRemoval = true) public static final String TRACE_SPAN_RECEIVER_PREFIX = "trace.span.receiver"; diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java index b9900f348ae..868a5332c8e 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java @@ -115,7 +115,7 @@ public void testSimpleScan(ScannerType stype) throws Exception { @EnumSource(value = ScanServerAllowedTablesIT.ScannerType.class) public void testScan(ScannerType stype) throws Exception { - final int rows = 500; + final int rows = 1000; try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { String tableName = getUniqueNames(1)[0] + stype.name(); From 3ed4585a30174366ca6f52168c9eeb5b1de204b5 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 27 Feb 2026 18:02:53 +0000 Subject: [PATCH 03/13] Implemented some PR suggestions --- .../clientImpl/OfflineTabletLocatorImpl.java | 67 +++++++++++-------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java index 1b5bee42927..908145fb648 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java @@ -21,15 +21,15 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.Properties; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.accumulo.core.client.AccumuloException; @@ -57,6 +57,7 @@ import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.RemovalListener; import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.base.Preconditions; public class OfflineTabletLocatorImpl extends TabletLocator { @@ -80,14 +81,13 @@ private class OfflineTabletsCache implements RemovalListener cache; - private final Set evictions = new HashSet<>(); - private final NavigableSet extents = - Collections.synchronizedNavigableSet(new TreeSet<>()); + private final LinkedBlockingQueue evictions = new LinkedBlockingQueue<>(); + private final TreeSet extents = new TreeSet<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private OfflineTabletsCache(ClientContext context) { this.context = context; - Properties clientProperties = ClientConfConverter.toProperties(context.getConfiguration()); + Properties clientProperties = context.getProperties(); Duration cacheDuration = Duration.ofMillis( ClientProperty.OFFLINE_LOCATOR_CACHE_DURATION.getTimeInMillis(clientProperties)); int maxCacheSize = @@ -95,24 +95,43 @@ private OfflineTabletsCache(ClientContext context) { prefetch = Integer .parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_PREFETCH.getValue(clientProperties)); cache = Caffeine.newBuilder().expireAfterAccess(cacheDuration).initialCapacity(maxCacheSize) - .maximumSize(maxCacheSize).evictionListener(this).removalListener(this) - .ticker(Ticker.systemTicker()).build(); + .maximumSize(maxCacheSize).removalListener(this).ticker(Ticker.systemTicker()).build(); } @Override public void onRemoval(KeyExtent key, KeyExtent value, RemovalCause cause) { LOG.trace("Extent was evicted from cache: {}", key); - synchronized (evictions) { - evictions.add(key); + evictions.add(key); + try { + if (lock.writeLock().tryLock(50, TimeUnit.MILLISECONDS)) { + try { + processEvictions(); + } finally { + lock.writeLock().unlock(); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting to acquire write lock", e); } } - private KeyExtent findOrLoadExtent(KeyExtent start) { + private void processEvictions() { + Preconditions.checkArgument(lock.writeLock().isHeldByCurrentThread()); + LOG.trace("Processing prior evictions"); + Set copy = new HashSet<>(); + evictions.drainTo(copy); + extents.removeAll(copy); + } + + private KeyExtent findOrLoadExtent(KeyExtent searchKey) { lock.readLock().lock(); try { - KeyExtent match = extents.ceiling(start); - if (match != null && match.contains(start)) { - LOG.trace("Extent {} found in cache for start row {}", match, start); + KeyExtent match = extents.ceiling(searchKey); + if (match != null && match.contains(searchKey)) { + // update access time in cache + cache.getIfPresent(match); + LOG.trace("Extent {} found in cache for start row {}", match, searchKey); return match; } } finally { @@ -123,7 +142,7 @@ private KeyExtent findOrLoadExtent(KeyExtent start) { processEvictions(); // Load TabletMetadata try (TabletsMetadata tm = - context.getAmple().readTablets().forTable(tid).overlapping(start.endRow(), true, null) + context.getAmple().readTablets().forTable(tid).overlapping(searchKey.endRow(), true, null) .fetch(ColumnType.PREV_ROW, ColumnType.LOCATION).build()) { Iterator iter = tm.iterator(); for (int i = 0; i < prefetch && iter.hasNext(); i++) { @@ -135,22 +154,15 @@ private KeyExtent findOrLoadExtent(KeyExtent start) { } LOG.trace("Caching extent: {}", ke); cache.put(ke, ke); + TabletLocatorImpl.removeOverlapping(extents, ke); extents.add(ke); } - return extents.ceiling(start); + return extents.ceiling(searchKey); } finally { lock.writeLock().unlock(); } } - private void processEvictions() { - synchronized (evictions) { - LOG.trace("Processing prior evictions"); - extents.removeAll(evictions); - evictions.clear(); - } - } - private void invalidate(KeyExtent failedExtent) { cache.invalidate(failedExtent); } @@ -190,8 +202,8 @@ public TabletLocation locateTablet(ClientContext context, Text row, boolean skip metadataRow.append(row.getBytes(), 0, row.getLength()); LOG.trace("Locating offline tablet for row: {}", metadataRow); - KeyExtent start = KeyExtent.fromMetaRow(metadataRow); - KeyExtent match = extentCache.findOrLoadExtent(start); + KeyExtent searchKey = KeyExtent.fromMetaRow(metadataRow); + KeyExtent match = extentCache.findOrLoadExtent(searchKey); if (match != null) { if (match.prevEndRow() == null || match.prevEndRow().compareTo(row) < 0) { LOG.trace("Found match for row: {}, extent = {}", row, match); @@ -232,10 +244,11 @@ public List binRanges(ClientContext context, List ranges, while (tl.tablet_extent.endRow() != null && !r.afterEndKey(new Key(tl.tablet_extent.endRow()).followingKey(PartialKey.ROW))) { + KeyExtent priorExtent = tl.tablet_extent; tl = locateTablet(context, tl.tablet_extent.endRow(), true, false); if (tl == null) { - LOG.trace("NOT FOUND following tablet in range: {}", r); + LOG.trace("NOT FOUND tablet following {} in range: {}", priorExtent, r); failures.add(r); continue l1; } From 8f3ad2d76edc38c02bbae2749254f0f35d764bf0 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 27 Feb 2026 18:16:29 +0000 Subject: [PATCH 04/13] Fix build --- .../accumulo/core/clientImpl/OfflineTabletLocatorImpl.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java index 908145fb648..40c210136b3 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java @@ -130,7 +130,8 @@ private KeyExtent findOrLoadExtent(KeyExtent searchKey) { KeyExtent match = extents.ceiling(searchKey); if (match != null && match.contains(searchKey)) { // update access time in cache - cache.getIfPresent(match); + @SuppressWarnings("unused") + var unused = cache.getIfPresent(match); LOG.trace("Extent {} found in cache for start row {}", match, searchKey); return match; } From 305297ea8918d3172d979127d9f8a575a099ab1c Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 27 Feb 2026 18:47:31 +0000 Subject: [PATCH 05/13] Enable cache metrics, fix location check --- .../clientImpl/OfflineTabletLocatorImpl.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java index 40c210136b3..92f56d2cf06 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java @@ -45,8 +45,6 @@ import org.apache.accumulo.core.manager.state.tables.TableState; import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; -import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; import org.apache.hadoop.io.Text; import org.slf4j.Logger; @@ -59,6 +57,9 @@ import com.github.benmanes.caffeine.cache.Ticker; import com.google.common.base.Preconditions; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.binder.cache.CaffeineStatsCounter; + public class OfflineTabletLocatorImpl extends TabletLocator { private static final Logger LOG = LoggerFactory.getLogger(OfflineTabletLocatorImpl.class); @@ -95,7 +96,10 @@ private OfflineTabletsCache(ClientContext context) { prefetch = Integer .parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_PREFETCH.getValue(clientProperties)); cache = Caffeine.newBuilder().expireAfterAccess(cacheDuration).initialCapacity(maxCacheSize) - .maximumSize(maxCacheSize).removalListener(this).ticker(Ticker.systemTicker()).build(); + .maximumSize(maxCacheSize).removalListener(this).ticker(Ticker.systemTicker()) + .recordStats(() -> new CaffeineStatsCounter(Metrics.globalRegistry, + OfflineTabletsCache.class.getSimpleName())) + .build(); } @Override @@ -149,9 +153,13 @@ private KeyExtent findOrLoadExtent(KeyExtent searchKey) { for (int i = 0; i < prefetch && iter.hasNext(); i++) { TabletMetadata t = iter.next(); KeyExtent ke = t.getExtent(); - Location loc = t.getLocation(); - if (loc != null && loc.getType() != LocationType.LAST) { - throw new IllegalStateException("Extent has current or future location: " + ke); + if (t.getLocation() != null) { + if (context.getTableState(tid) == TableState.ONLINE) { + throw new IllegalStateException( + "Cannot continue scan with OfflineTabletLocator, table is now online"); + } + throw new IllegalStateException( + "Extent " + ke + " has current or future location, but table is not online"); } LOG.trace("Caching extent: {}", ke); cache.put(ke, ke); From 63b9d1748b7fa05699fa4b0a950a381ea29abea0 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 27 Feb 2026 18:55:26 +0000 Subject: [PATCH 06/13] Add logging for time it takes to load tablets --- .../core/clientImpl/OfflineTabletLocatorImpl.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java index 92f56d2cf06..4de65d9455a 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java @@ -46,6 +46,7 @@ import org.apache.accumulo.core.metadata.schema.TabletMetadata; import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType; import org.apache.accumulo.core.metadata.schema.TabletsMetadata; +import org.apache.accumulo.core.util.Timer; import org.apache.hadoop.io.Text; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,6 +86,7 @@ private class OfflineTabletsCache implements RemovalListener evictions = new LinkedBlockingQueue<>(); private final TreeSet extents = new TreeSet<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Timer scanTimer = Timer.startNew(); private OfflineTabletsCache(ClientContext context) { this.context = context; @@ -146,6 +148,10 @@ private KeyExtent findOrLoadExtent(KeyExtent searchKey) { // process prior evictions since we have the write lock processEvictions(); // Load TabletMetadata + if (LOG.isDebugEnabled()) { + scanTimer.restart(); + } + int added = 0; try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tid).overlapping(searchKey.endRow(), true, null) .fetch(ColumnType.PREV_ROW, ColumnType.LOCATION).build()) { @@ -165,6 +171,11 @@ private KeyExtent findOrLoadExtent(KeyExtent searchKey) { cache.put(ke, ke); TabletLocatorImpl.removeOverlapping(extents, ke); extents.add(ke); + added++; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Took {}ms to scan and load {} metadata tablets for table {}", + scanTimer.elapsed(TimeUnit.MILLISECONDS) , added , tid); } return extents.ceiling(searchKey); } finally { From 560323037d77ffcc4c4bf8bb078c46f1b4239ff8 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Fri, 27 Feb 2026 18:59:31 +0000 Subject: [PATCH 07/13] Fix formatting --- .../accumulo/core/clientImpl/OfflineTabletLocatorImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java index 4de65d9455a..e5e0a05578a 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java @@ -175,7 +175,7 @@ private KeyExtent findOrLoadExtent(KeyExtent searchKey) { } if (LOG.isDebugEnabled()) { LOG.debug("Took {}ms to scan and load {} metadata tablets for table {}", - scanTimer.elapsed(TimeUnit.MILLISECONDS) , added , tid); + scanTimer.elapsed(TimeUnit.MILLISECONDS), added, tid); } return extents.ceiling(searchKey); } finally { From 40cafc06311414462fee20cbb1c971f1b60767de Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Mon, 2 Mar 2026 17:32:41 +0000 Subject: [PATCH 08/13] Changes for correctness, dealing with cache max size behavior --- .../clientImpl/OfflineTabletLocatorImpl.java | 62 ++++++++++++++++--- 1 file changed, 52 insertions(+), 10 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java index e5e0a05578a..b4ee4b5b8b5 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java @@ -30,6 +30,7 @@ import java.util.TreeSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.accumulo.core.client.AccumuloException; @@ -53,6 +54,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Policy.Eviction; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.RemovalListener; import com.github.benmanes.caffeine.cache.Ticker; @@ -81,37 +83,54 @@ private class OfflineTabletsCache implements RemovalListener cache; private final LinkedBlockingQueue evictions = new LinkedBlockingQueue<>(); private final TreeSet extents = new TreeSet<>(); private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); private final Timer scanTimer = Timer.startNew(); + private final AtomicInteger cacheCount = new AtomicInteger(0); + private final Eviction evictionPolicy; private OfflineTabletsCache(ClientContext context) { this.context = context; Properties clientProperties = context.getProperties(); Duration cacheDuration = Duration.ofMillis( ClientProperty.OFFLINE_LOCATOR_CACHE_DURATION.getTimeInMillis(clientProperties)); - int maxCacheSize = + maxCacheSize = Integer.parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_SIZE.getValue(clientProperties)); prefetch = Integer .parseInt(ClientProperty.OFFLINE_LOCATOR_CACHE_PREFETCH.getValue(clientProperties)); + + // This cache is used to evict KeyExtents from the extents TreeSet when + // they have not been accessed in cacheDuration. We are targeting to have + // maxCacheSize objects in the cache, but are not using the Cache's maximumSize + // to achieve this as the Cache will remove things from the Cache that were + // newly inserted and not yet used. This negates the pre-fetching feature + // that we have added into this TabletLocator for offline tables. Here we + // set the maximum size much larger that the property and use the cacheCount + // variable to manage the max size manually. cache = Caffeine.newBuilder().expireAfterAccess(cacheDuration).initialCapacity(maxCacheSize) - .maximumSize(maxCacheSize).removalListener(this).ticker(Ticker.systemTicker()) + .maximumSize(maxCacheSize * 2).removalListener(this).ticker(Ticker.systemTicker()) .recordStats(() -> new CaffeineStatsCounter(Metrics.globalRegistry, OfflineTabletsCache.class.getSimpleName())) .build(); + evictionPolicy = cache.policy().eviction().orElseThrow(); } @Override public void onRemoval(KeyExtent key, KeyExtent value, RemovalCause cause) { - LOG.trace("Extent was evicted from cache: {}", key); + if (cause == RemovalCause.REPLACED) { + return; + } + LOG.trace("Extent {} was evicted from cache for {} ", key, cause); + cacheCount.decrementAndGet(); evictions.add(key); try { - if (lock.writeLock().tryLock(50, TimeUnit.MILLISECONDS)) { + if (lock.writeLock().tryLock(1, TimeUnit.MILLISECONDS)) { try { - processEvictions(); + processRecentCacheEvictions(); } finally { lock.writeLock().unlock(); } @@ -122,19 +141,22 @@ public void onRemoval(KeyExtent key, KeyExtent value, RemovalCause cause) { } } - private void processEvictions() { + private void processRecentCacheEvictions() { Preconditions.checkArgument(lock.writeLock().isHeldByCurrentThread()); - LOG.trace("Processing prior evictions"); Set copy = new HashSet<>(); evictions.drainTo(copy); - extents.removeAll(copy); + int numEvictions = copy.size(); + if (numEvictions > 0) { + LOG.trace("Processing {} prior evictions", numEvictions); + extents.removeAll(copy); + } } private KeyExtent findOrLoadExtent(KeyExtent searchKey) { lock.readLock().lock(); try { KeyExtent match = extents.ceiling(searchKey); - if (match != null && match.contains(searchKey)) { + if (match != null && match.contains(searchKey.endRow())) { // update access time in cache @SuppressWarnings("unused") var unused = cache.getIfPresent(match); @@ -146,7 +168,26 @@ private KeyExtent findOrLoadExtent(KeyExtent searchKey) { } lock.writeLock().lock(); // process prior evictions since we have the write lock - processEvictions(); + processRecentCacheEvictions(); + // The following block of code fixes an issue with + // the cache where recently pre-fetched extents + // will be evicted from the cache when it reaches + // the maxCacheSize. This is because from the cache's + // perspective they are the coldest objects. The code + // below manually removes the coldest extents that are + // before the searchKey.endRow to make room for the next + // batch of extents that we are going to load into the + // cache so that they are not immediately evicted. + if (cacheCount.get() + prefetch + 1 >= maxCacheSize) { + int evictionSize = prefetch * 2; + Set candidates = new HashSet<>(evictionPolicy.coldest(evictionSize).keySet()); + LOG.trace("Cache near max size, evaluating {} coldest entries", candidates); + candidates.removeIf(ke -> ke.contains(searchKey.endRow()) || ke.endRow() == null + || ke.endRow().compareTo(searchKey.endRow()) >= 0); + LOG.trace("Manually evicting coldest entries: {}", candidates); + cache.invalidateAll(candidates); + cache.cleanUp(); + } // Load TabletMetadata if (LOG.isDebugEnabled()) { scanTimer.restart(); @@ -169,6 +210,7 @@ private KeyExtent findOrLoadExtent(KeyExtent searchKey) { } LOG.trace("Caching extent: {}", ke); cache.put(ke, ke); + cacheCount.incrementAndGet(); TabletLocatorImpl.removeOverlapping(extents, ke); extents.add(ke); added++; From b0f64bf5d90f7ca66fa98df3686cfa4054324f6c Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Mon, 2 Mar 2026 19:29:24 +0000 Subject: [PATCH 09/13] Change ticker option to scheduler, minor doc edits --- .../clientImpl/OfflineTabletLocatorImpl.java | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java index b4ee4b5b8b5..495e65e8854 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java @@ -57,7 +57,7 @@ import com.github.benmanes.caffeine.cache.Policy.Eviction; import com.github.benmanes.caffeine.cache.RemovalCause; import com.github.benmanes.caffeine.cache.RemovalListener; -import com.github.benmanes.caffeine.cache.Ticker; +import com.github.benmanes.caffeine.cache.Scheduler; import com.google.common.base.Preconditions; import io.micrometer.core.instrument.Metrics; @@ -79,9 +79,6 @@ public OfflineTabletLocation(KeyExtent tablet_extent) { private class OfflineTabletsCache implements RemovalListener { - // This object uses a Caffeine cache to manage the duration of the extents - // cached in the TreeSet. The TreeSet is necessary for expedient operations. - private final ClientContext context; private final int maxCacheSize; private final int prefetch; @@ -109,19 +106,26 @@ private OfflineTabletsCache(ClientContext context) { // to achieve this as the Cache will remove things from the Cache that were // newly inserted and not yet used. This negates the pre-fetching feature // that we have added into this TabletLocator for offline tables. Here we - // set the maximum size much larger that the property and use the cacheCount + // set the maximum size much larger than the property and use the cacheCount // variable to manage the max size manually. - cache = Caffeine.newBuilder().expireAfterAccess(cacheDuration).initialCapacity(maxCacheSize) - .maximumSize(maxCacheSize * 2).removalListener(this).ticker(Ticker.systemTicker()) + // @formatter:off + cache = Caffeine.newBuilder() + .expireAfterAccess(cacheDuration) + .initialCapacity(maxCacheSize) + .maximumSize(maxCacheSize * 2) + .removalListener(this) + .scheduler(Scheduler.systemScheduler()) .recordStats(() -> new CaffeineStatsCounter(Metrics.globalRegistry, OfflineTabletsCache.class.getSimpleName())) .build(); + // @formatter:on evictionPolicy = cache.policy().eviction().orElseThrow(); } @Override public void onRemoval(KeyExtent key, KeyExtent value, RemovalCause cause) { if (cause == RemovalCause.REPLACED) { + // Don't remove from `extents` if the object was replaced in the cache return; } LOG.trace("Extent {} was evicted from cache for {} ", key, cause); From 06b63317b290894970b0ddae4d51cd343ee9aa6f Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Mon, 2 Mar 2026 22:43:08 +0000 Subject: [PATCH 10/13] Modified cache size property to be optional --- .../clientImpl/OfflineTabletLocatorImpl.java | 33 +++++++++++++----- .../accumulo/core/conf/ClientProperty.java | 6 ++-- .../test/ScanServerOfflineTableIT.java | 34 ++++++++++++++++--- 3 files changed, 58 insertions(+), 15 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java index 495e65e8854..6e8c97517e0 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java @@ -90,7 +90,8 @@ private class OfflineTabletsCache implements RemovalListener evictionPolicy; - private OfflineTabletsCache(ClientContext context) { + private OfflineTabletsCache(ClientContext context) + throws TableNotFoundException, AccumuloSecurityException, AccumuloException { this.context = context; Properties clientProperties = context.getProperties(); Duration cacheDuration = Duration.ofMillis( @@ -109,17 +110,25 @@ private OfflineTabletsCache(ClientContext context) { // set the maximum size much larger than the property and use the cacheCount // variable to manage the max size manually. // @formatter:off - cache = Caffeine.newBuilder() + Caffeine builder = Caffeine.newBuilder() .expireAfterAccess(cacheDuration) - .initialCapacity(maxCacheSize) - .maximumSize(maxCacheSize * 2) .removalListener(this) .scheduler(Scheduler.systemScheduler()) .recordStats(() -> new CaffeineStatsCounter(Metrics.globalRegistry, - OfflineTabletsCache.class.getSimpleName())) - .build(); + OfflineTabletsCache.class.getSimpleName())); + if (maxCacheSize > 0) { + builder.initialCapacity(maxCacheSize).maximumSize(maxCacheSize * 2); + } else { + String tname = context.getTableName(tid); + builder.initialCapacity(context.tableOperations().listSplits(tname).size()); + } + cache = builder.build(); // @formatter:on - evictionPolicy = cache.policy().eviction().orElseThrow(); + if (maxCacheSize > 0) { + evictionPolicy = cache.policy().eviction().orElseThrow(); + } else { + evictionPolicy = null; + } } @Override @@ -182,7 +191,7 @@ private KeyExtent findOrLoadExtent(KeyExtent searchKey) { // before the searchKey.endRow to make room for the next // batch of extents that we are going to load into the // cache so that they are not immediately evicted. - if (cacheCount.get() + prefetch + 1 >= maxCacheSize) { + if (maxCacheSize > 0 && cacheCount.get() + prefetch + 1 >= maxCacheSize) { int evictionSize = prefetch * 2; Set candidates = new HashSet<>(evictionPolicy.coldest(evictionSize).keySet()); LOG.trace("Cache near max size, evaluating {} coldest entries", candidates); @@ -251,7 +260,13 @@ public OfflineTabletLocatorImpl(ClientContext context, TableId tableId) { if (context.getTableState(tid) != TableState.OFFLINE) { throw new IllegalStateException("Table " + tableId + " is not offline"); } - extentCache = new OfflineTabletsCache(context); + try { + extentCache = new OfflineTabletsCache(context); + } catch (TableNotFoundException e) { + throw new IllegalStateException("Table " + tableId + " does not exist", e); + } catch (AccumuloSecurityException | AccumuloException e) { + throw new IllegalStateException("Unable to get split points for table: " + tableId, e); + } } @Override diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java index b7812c8b623..9edb7665969 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java @@ -154,9 +154,11 @@ public enum ClientProperty { "The number of offline extents that should be pre-loaded into the cache. The offline" + " extent information is used when performing eventual scans on offline tables.", "2.1.5", false), - OFFLINE_LOCATOR_CACHE_SIZE("offline.locator.cache.size", "100", PropertyType.COUNT, + @Experimental + OFFLINE_LOCATOR_CACHE_SIZE("offline.locator.cache.size", "0", PropertyType.COUNT, "The number of offline extents that should be cached in the client. The offline" - + " extent information is used when performing eventual scans on offline tables.", + + " extent information is used when performing eventual scans on offline tables. The" + + " value zero disables the size limitation.", "2.1.5", false); @Deprecated(since = "2.1.0", forRemoval = true) diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java index 868a5332c8e..e9056cea5cb 100644 --- a/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerOfflineTableIT.java @@ -20,8 +20,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; import java.util.SortedSet; import java.util.TreeSet; +import java.util.stream.Stream; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Accumulo; @@ -30,6 +34,7 @@ import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; import org.apache.accumulo.core.clientImpl.ClientContext; import org.apache.accumulo.core.clientImpl.TabletLocator; +import org.apache.accumulo.core.conf.ClientProperty; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; @@ -42,7 +47,11 @@ import org.apache.hadoop.io.Text; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.EnumSource; import com.google.common.collect.Iterables; @@ -111,14 +120,31 @@ public void testSimpleScan(ScannerType stype) throws Exception { } } + public static class ArgProvider implements ArgumentsProvider { + + @Override + public Stream provideArguments(ExtensionContext context) throws Exception { + List args = new ArrayList<>(); + args.add(Arguments.of(ScannerType.BATCH_SCANNER, 0)); + args.add(Arguments.of(ScannerType.SCANNER, 0)); + args.add(Arguments.of(ScannerType.BATCH_SCANNER, 100)); + args.add(Arguments.of(ScannerType.SCANNER, 100)); + return args.stream(); + } + + } + @ParameterizedTest - @EnumSource(value = ScanServerAllowedTablesIT.ScannerType.class) - public void testScan(ScannerType stype) throws Exception { + @ArgumentsSource(ArgProvider.class) + public void testScan(ScannerType stype, int maxCacheSize) throws Exception { final int rows = 1000; - try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { - String tableName = getUniqueNames(1)[0] + stype.name(); + final Properties p = getClientProps(); + p.put(ClientProperty.OFFLINE_LOCATOR_CACHE_SIZE.getKey(), Integer.toString(maxCacheSize)); + + try (AccumuloClient client = Accumulo.newClient().from(p).build()) { + String tableName = getUniqueNames(1)[0] + stype.name() + "_" + maxCacheSize; final int ingestedEntryCount = ScanServerIT.createTableAndIngest(client, tableName, null, rows, 10, "colf"); From bf0ba6e757374f74d9c1d009e11b5d50ac163a4f Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Tue, 3 Mar 2026 15:23:54 +0000 Subject: [PATCH 11/13] Implemented PR suggestions --- .../core/clientImpl/OfflineTabletLocatorImpl.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java index 6e8c97517e0..44e4fe30c06 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java @@ -213,14 +213,6 @@ private KeyExtent findOrLoadExtent(KeyExtent searchKey) { for (int i = 0; i < prefetch && iter.hasNext(); i++) { TabletMetadata t = iter.next(); KeyExtent ke = t.getExtent(); - if (t.getLocation() != null) { - if (context.getTableState(tid) == TableState.ONLINE) { - throw new IllegalStateException( - "Cannot continue scan with OfflineTabletLocator, table is now online"); - } - throw new IllegalStateException( - "Extent " + ke + " has current or future location, but table is not online"); - } LOG.trace("Caching extent: {}", ke); cache.put(ke, ke); cacheCount.incrementAndGet(); @@ -286,7 +278,7 @@ public TabletLocation locateTablet(ClientContext context, Text row, boolean skip KeyExtent searchKey = KeyExtent.fromMetaRow(metadataRow); KeyExtent match = extentCache.findOrLoadExtent(searchKey); if (match != null) { - if (match.prevEndRow() == null || match.prevEndRow().compareTo(row) < 0) { + if (match.contains(row)) { LOG.trace("Found match for row: {}, extent = {}", row, match); return new OfflineTabletLocation(match); } From 93820ba28018bcd07ce2d35839b4abf6caabca76 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Tue, 3 Mar 2026 16:06:24 +0000 Subject: [PATCH 12/13] Fix try/finally for write lock --- .../clientImpl/OfflineTabletLocatorImpl.java | 86 ++++++++++--------- 1 file changed, 44 insertions(+), 42 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java index 44e4fe30c06..de7b592d26d 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineTabletLocatorImpl.java @@ -180,51 +180,53 @@ private KeyExtent findOrLoadExtent(KeyExtent searchKey) { lock.readLock().unlock(); } lock.writeLock().lock(); - // process prior evictions since we have the write lock - processRecentCacheEvictions(); - // The following block of code fixes an issue with - // the cache where recently pre-fetched extents - // will be evicted from the cache when it reaches - // the maxCacheSize. This is because from the cache's - // perspective they are the coldest objects. The code - // below manually removes the coldest extents that are - // before the searchKey.endRow to make room for the next - // batch of extents that we are going to load into the - // cache so that they are not immediately evicted. - if (maxCacheSize > 0 && cacheCount.get() + prefetch + 1 >= maxCacheSize) { - int evictionSize = prefetch * 2; - Set candidates = new HashSet<>(evictionPolicy.coldest(evictionSize).keySet()); - LOG.trace("Cache near max size, evaluating {} coldest entries", candidates); - candidates.removeIf(ke -> ke.contains(searchKey.endRow()) || ke.endRow() == null - || ke.endRow().compareTo(searchKey.endRow()) >= 0); - LOG.trace("Manually evicting coldest entries: {}", candidates); - cache.invalidateAll(candidates); - cache.cleanUp(); - } - // Load TabletMetadata - if (LOG.isDebugEnabled()) { - scanTimer.restart(); - } - int added = 0; - try (TabletsMetadata tm = - context.getAmple().readTablets().forTable(tid).overlapping(searchKey.endRow(), true, null) - .fetch(ColumnType.PREV_ROW, ColumnType.LOCATION).build()) { - Iterator iter = tm.iterator(); - for (int i = 0; i < prefetch && iter.hasNext(); i++) { - TabletMetadata t = iter.next(); - KeyExtent ke = t.getExtent(); - LOG.trace("Caching extent: {}", ke); - cache.put(ke, ke); - cacheCount.incrementAndGet(); - TabletLocatorImpl.removeOverlapping(extents, ke); - extents.add(ke); - added++; + try { + // process prior evictions since we have the write lock + processRecentCacheEvictions(); + // The following block of code fixes an issue with + // the cache where recently pre-fetched extents + // will be evicted from the cache when it reaches + // the maxCacheSize. This is because from the cache's + // perspective they are the coldest objects. The code + // below manually removes the coldest extents that are + // before the searchKey.endRow to make room for the next + // batch of extents that we are going to load into the + // cache so that they are not immediately evicted. + if (maxCacheSize > 0 && cacheCount.get() + prefetch + 1 >= maxCacheSize) { + int evictionSize = prefetch * 2; + Set candidates = new HashSet<>(evictionPolicy.coldest(evictionSize).keySet()); + LOG.trace("Cache near max size, evaluating {} coldest entries", candidates); + candidates.removeIf(ke -> ke.contains(searchKey.endRow()) || ke.endRow() == null + || ke.endRow().compareTo(searchKey.endRow()) >= 0); + LOG.trace("Manually evicting coldest entries: {}", candidates); + cache.invalidateAll(candidates); + cache.cleanUp(); } + // Load TabletMetadata if (LOG.isDebugEnabled()) { - LOG.debug("Took {}ms to scan and load {} metadata tablets for table {}", - scanTimer.elapsed(TimeUnit.MILLISECONDS), added, tid); + scanTimer.restart(); + } + int added = 0; + try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tid) + .overlapping(searchKey.endRow(), true, null) + .fetch(ColumnType.PREV_ROW, ColumnType.LOCATION).build()) { + Iterator iter = tm.iterator(); + for (int i = 0; i < prefetch && iter.hasNext(); i++) { + TabletMetadata t = iter.next(); + KeyExtent ke = t.getExtent(); + LOG.trace("Caching extent: {}", ke); + cache.put(ke, ke); + cacheCount.incrementAndGet(); + TabletLocatorImpl.removeOverlapping(extents, ke); + extents.add(ke); + added++; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Took {}ms to scan and load {} metadata tablets for table {}", + scanTimer.elapsed(TimeUnit.MILLISECONDS), added, tid); + } + return extents.ceiling(searchKey); } - return extents.ceiling(searchKey); } finally { lock.writeLock().unlock(); } From 206b1a8ed8c92ff2fcc333243a07b8b0e8db55c7 Mon Sep 17 00:00:00 2001 From: Dave Marion Date: Tue, 3 Mar 2026 17:02:32 +0000 Subject: [PATCH 13/13] Implemented PR suggestions, updated property descriptions --- .../accumulo/core/clientImpl/TabletLocator.java | 2 ++ .../accumulo/core/conf/ClientProperty.java | 16 ++++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java index b1a4557d7f9..353bfd6da04 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletLocator.java @@ -140,6 +140,8 @@ public static synchronized TabletLocator getLocator(ClientContext context, Table + "disabled. This is likely caused by all AccumuloClients being closed or garbage collected"); TableState state = context.getTableState(tableId); if (state == TableState.OFFLINE) { + LocatorKey key = new LocatorKey(context.getInstanceID(), tableId); + locators.remove(key); return offlineLocators.computeIfAbsent(tableId, f -> new OfflineTabletLocatorImpl(context, tableId)); } else { diff --git a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java index 9edb7665969..1bcf48c78cb 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/ClientProperty.java @@ -146,18 +146,22 @@ public enum ClientProperty { /* * For use with OfflineTabletLocatorImpl */ + @Experimental OFFLINE_LOCATOR_CACHE_DURATION("offline.locator.cache.duration", "10m", PropertyType.TIMEDURATION, - "Amount of time for which offline extent information should be cached in the client. The offline" - + " extent information is used when performing eventual scans on offline tables.", + "The client caches extent information for offline tables for use with eventually consistent" + + " scans and Scan Servers. This property controls how long the extent information is cached" + + " in the client after it's last use.", "2.1.5", false), + @Experimental OFFLINE_LOCATOR_CACHE_PREFETCH("offline.locator.cache.prefetch", "10", PropertyType.COUNT, - "The number of offline extents that should be pre-loaded into the cache. The offline" - + " extent information is used when performing eventual scans on offline tables.", + "The number of offline extents that should be pre-loaded into the cache. This may reduce" + + " the load on the metadata table when looking up extent information. Smaller values" + + " may make sense here in a random lookup workload, larger values in sequential scans" + + " over multiple tablets.", "2.1.5", false), @Experimental OFFLINE_LOCATOR_CACHE_SIZE("offline.locator.cache.size", "0", PropertyType.COUNT, - "The number of offline extents that should be cached in the client. The offline" - + " extent information is used when performing eventual scans on offline tables. The" + "The number of offline extents that should be cached in the client. The" + " value zero disables the size limitation.", "2.1.5", false);