diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java index 501209f1c902..903aa1039f8a 100644 --- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java +++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java @@ -424,74 +424,76 @@ public static List getSplits(Scan scan, SnapshotManifest manifest, Connection connection = null; RegionLocator regionLocator = null; - if (localityEnabled && useRegionLoc) { - Configuration newConf = new Configuration(conf); - newConf.setInt("hbase.hconnection.threads.max", 1); - try { + List splits = new ArrayList<>(); + try { + if (localityEnabled && useRegionLoc) { + Configuration newConf = new Configuration(conf); + newConf.setInt("hbase.hconnection.threads.max", 1); + connection = ConnectionFactory.createConnection(newConf); regionLocator = connection.getRegionLocator(htd.getTableName()); /* Get all locations for the table and cache it */ regionLocator.getAllRegionLocations(); - } finally { - if (connection != null) { - connection.close(); - } } - } - List splits = new ArrayList<>(); - for (RegionInfo hri : regionManifests) { - // load region descriptor - List hosts = null; - if (localityEnabled) { - if (regionLocator != null) { - /* Get Location from the local cache */ - HRegionLocation location = regionLocator.getRegionLocation(hri.getStartKey(), false); - - hosts = new ArrayList<>(1); - hosts.add(location.getHostname()); - } else { - hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir); + for (RegionInfo hri : regionManifests) { + // load region descriptor + List hosts = null; + if (localityEnabled) { + if (regionLocator != null) { + /* Get Location from the local cache */ + HRegionLocation location = regionLocator.getRegionLocation(hri.getStartKey(), false); + + hosts = new ArrayList<>(1); + hosts.add(location.getHostname()); + } else { + hosts = calculateLocationsForInputSplit(conf, htd, hri, tableDir); + } } - } - if (numSplits > 1) { - byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true); - for (int i = 0; i < sp.length - 1; i++) { + if (numSplits > 1) { + byte[][] sp = sa.split(hri.getStartKey(), hri.getEndKey(), numSplits, true); + for (int i = 0; i < sp.length - 1; i++) { + if ( + PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], + sp[i + 1]) + ) { + + Scan boundedScan = new Scan(scan); + if (scan.getStartRow().length == 0) { + boundedScan.withStartRow(sp[i]); + } else { + boundedScan.withStartRow( + Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]); + } + + if (scan.getStopRow().length == 0) { + boundedScan.withStopRow(sp[i + 1]); + } else { + boundedScan.withStopRow(Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 + ? scan.getStopRow() + : sp[i + 1]); + } + + splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); + } + } + } else { if ( - PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), sp[i], sp[i + 1]) + PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), + hri.getStartKey(), hri.getEndKey()) ) { - Scan boundedScan = new Scan(scan); - if (scan.getStartRow().length == 0) { - boundedScan.withStartRow(sp[i]); - } else { - boundedScan.withStartRow( - Bytes.compareTo(scan.getStartRow(), sp[i]) > 0 ? scan.getStartRow() : sp[i]); - } - - if (scan.getStopRow().length == 0) { - boundedScan.withStopRow(sp[i + 1]); - } else { - boundedScan.withStopRow( - Bytes.compareTo(scan.getStopRow(), sp[i + 1]) < 0 ? scan.getStopRow() : sp[i + 1]); - } - - splits.add(new InputSplit(htd, hri, hosts, boundedScan, restoreDir)); + splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); } } - } else { - if ( - PrivateCellUtil.overlappingKeys(scan.getStartRow(), scan.getStopRow(), hri.getStartKey(), - hri.getEndKey()) - ) { - - splits.add(new InputSplit(htd, hri, hosts, scan, restoreDir)); - } + } + } finally { + if (connection != null) { + connection.close(); } } - return splits; } diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java index c24f8e62c816..3c1753421335 100644 --- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java +++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan.ReadType; @@ -47,6 +48,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat.TableSnapshotRegionSplit; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; +import org.apache.hadoop.hbase.snapshot.SnapshotManifest; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests; @@ -683,4 +685,49 @@ public void testReadFromRestoredSnapshotViaMR() throws Exception { } } } + + /** + * Tests that TableSnapshotInputFormatImpl correctly caches and uses region locations when + * locality is enabled + */ + @Test + public void testRegionLocatorUsesCache() throws Exception { + final TableName tableName = TableName.valueOf("testRegionLocatorCache"); + Configuration conf = UTIL.getConfiguration(); + try { + // Create table with multiple regions + createTableAndSnapshot(UTIL, tableName, "snapshot", getStartRow(), getEndRow(), 3); + + conf.setBoolean(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION, true); + + Path tmpTableDir = UTIL.getDataTestDirOnTestFS("snapshot"); + + // Get snapshot manifest and region info + SnapshotManifest manifest = TableSnapshotInputFormatImpl.getSnapshotManifest(conf, "snapshot", + CommonFSUtils.getRootDir(conf), UTIL.getTestFileSystem()); + List regionInfos = + TableSnapshotInputFormatImpl.getRegionInfosFromManifest(manifest); + + Scan scan = new Scan(); + + List splits = TableSnapshotInputFormatImpl.getSplits( + scan, manifest, regionInfos, tmpTableDir, conf, new RegionSplitter.UniformSplit(), 1); + + // Verify that splits contain proper locality information + Assert.assertNotNull(splits); + Assert.assertTrue(splits.size() > 0); + + // Verify locations are populated from cache + for (TableSnapshotInputFormatImpl.InputSplit split : splits) { + String[] locations = split.getLocations(); + // Locations should be populated from cache + Assert.assertNotNull(locations); + } + + } finally { + UTIL.getAdmin().deleteSnapshot("snapshot"); + UTIL.deleteTable(tableName); + conf.unset(SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION); + } + } }