Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 36 additions & 7 deletions core/src/main/java/org/apache/iceberg/rest/RESTTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,31 @@
*/
package org.apache.iceberg.rest;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.BatchScan;
import org.apache.iceberg.BatchScanAdapter;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.ImmutableTableScanContext;
import org.apache.iceberg.SupportsDistributedScanPlanning;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.StorageCredential;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.rest.credentials.Credential;

class RESTTable extends BaseTable implements SupportsDistributedScanPlanning {
private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO";

private final RESTClient client;
private final Supplier<Map<String, String>> headers;
private final MetricsReporter reporter;
Expand Down Expand Up @@ -65,18 +76,36 @@ class RESTTable extends BaseTable implements SupportsDistributedScanPlanning {

@Override
public TableScan newScan() {
BiFunction<List<Credential>, String, FileIO> fileIOFactory =
(credentials, scanPlanId) -> {
Map<String, String> props =
scanPlanId != null
? ImmutableMap.<String, String>builder()
.putAll(catalogProperties)
.put(RESTCatalogProperties.REST_SCAN_PLAN_ID, scanPlanId)
.buildKeepingLast()
: catalogProperties;
List<StorageCredential> storageCredentials =
credentials.stream()
.map(c -> StorageCredential.create(c.prefix(), c.config()))
.collect(Collectors.toList());
return CatalogUtil.loadFileIO(
catalogProperties.getOrDefault(CatalogProperties.FILE_IO_IMPL, DEFAULT_FILE_IO_IMPL),
props,
hadoopConf,
storageCredentials);
};

TableResource tableResource =
new TableResource(resourcePaths, tableIdentifier, supportedEndpoints, fileIOFactory);

return new RESTTableScan(
this,
schema(),
ImmutableTableScanContext.builder().metricsReporter(reporter).build(),
client,
headers.get(),
operations(),
tableIdentifier,
resourcePaths,
supportedEndpoints,
catalogProperties,
hadoopConf);
headers,
tableResource);
}

@Override
Expand Down
115 changes: 36 additions & 79 deletions core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,20 @@
import com.github.benmanes.caffeine.cache.RemovalListener;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataTableScan;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.TableScanContext;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.StorageCredential;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.credentials.Credential;
import org.apache.iceberg.rest.requests.PlanTableScanRequest;
Expand All @@ -61,7 +54,6 @@ class RESTTableScan extends DataTableScan {
private static final int MAX_RETRIES = 10; // Max number of poll retries
private static final long MAX_WAIT_TIME_MS = 5 * 60 * 1000; // Total maximum duration (5 minutes)
private static final double SCALE_FACTOR = 2.0; // Exponential scale factor
private static final String DEFAULT_FILE_IO_IMPL = "org.apache.iceberg.io.ResolvingFileIO";
private static final Cache<RESTTableScan, FileIO> FILEIO_TRACKER =
Caffeine.newBuilder()
.weakKeys()
Expand All @@ -75,14 +67,9 @@ class RESTTableScan extends DataTableScan {
.build();

private final RESTClient client;
private final Map<String, String> headers;
private final TableOperations operations;
private final ResourcePaths resourcePaths;
private final TableIdentifier tableIdentifier;
private final Set<Endpoint> supportedEndpoints;
private final ParserContext parserContext;
private final Map<String, String> catalogProperties;
private final Object hadoopConf;
private final Supplier<Map<String, String>> headers;
private final TableResource tableResource;
private volatile ParserContext lazyParserContext;
private String planId = null;
private FileIO scanFileIO = null;
private boolean useSnapshotSchema = false;
Expand All @@ -92,45 +79,31 @@ class RESTTableScan extends DataTableScan {
Schema schema,
TableScanContext context,
RESTClient client,
Map<String, String> headers,
TableOperations operations,
TableIdentifier tableIdentifier,
ResourcePaths resourcePaths,
Set<Endpoint> supportedEndpoints,
Map<String, String> catalogProperties,
Object hadoopConf) {
Supplier<Map<String, String>> headers,
TableResource tableResource) {
super(table, schema, context);
this.client = client;
this.headers = headers;
this.operations = operations;
this.tableIdentifier = tableIdentifier;
this.resourcePaths = resourcePaths;
this.supportedEndpoints = supportedEndpoints;
this.parserContext =
ParserContext.builder()
.add("specsById", table.specs())
.add("caseSensitive", context().caseSensitive())
.build();
this.catalogProperties = catalogProperties;
this.hadoopConf = hadoopConf;
this.tableResource = tableResource;
}

private ParserContext lazyParserContext() {
if (lazyParserContext == null) {
this.lazyParserContext =
ParserContext.builder()
.add("specsById", table().specs())
.add("caseSensitive", context().caseSensitive())
.build();
}
return lazyParserContext;
}

@Override
protected TableScan newRefinedScan(
Table refinedTable, Schema refinedSchema, TableScanContext refinedContext) {
RESTTableScan scan =
new RESTTableScan(
refinedTable,
refinedSchema,
refinedContext,
client,
headers,
operations,
tableIdentifier,
resourcePaths,
supportedEndpoints,
catalogProperties,
hadoopConf);
refinedTable, refinedSchema, refinedContext, client, headers, tableResource);
scan.useSnapshotSchema = useSnapshotSchema;
return scan;
}
Expand Down Expand Up @@ -197,13 +170,13 @@ public CloseableIterable<FileScanTask> planFiles() {
private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest planTableScanRequest) {
PlanTableScanResponse response =
client.post(
resourcePaths.planTableScan(tableIdentifier),
tableResource.planPath(),
planTableScanRequest,
PlanTableScanResponse.class,
headers,
headers.get(),
ErrorHandlers.tableErrorHandler(),
stringStringMap -> {},
parserContext);
lazyParserContext());

this.planId = response.planId();
PlanStatus planStatus = response.planStatus();
Expand All @@ -214,7 +187,10 @@ private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest planT
case COMPLETED:
return scanTasksIterable(response.planTasks(), response.fileScanTasks());
case SUBMITTED:
Endpoint.check(supportedEndpoints, Endpoint.V1_FETCH_TABLE_SCAN_PLAN);
Preconditions.checkState(
tableResource.supportsAsync(),
"Invalid plan status %s: server does not support async scan planning",
PlanStatus.SUBMITTED);
return fetchPlanningResult();
case FAILED:
throw new IllegalStateException(
Expand All @@ -228,22 +204,8 @@ private CloseableIterable<FileScanTask> planTableScan(PlanTableScanRequest planT
}
}

private FileIO scanFileIO(List<Credential> storageCredentials) {
ImmutableMap.Builder<String, String> builder =
ImmutableMap.<String, String>builder().putAll(catalogProperties);
if (null != planId) {
builder.put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId);
}

Map<String, String> properties = builder.buildKeepingLast();
FileIO ioForScan =
CatalogUtil.loadFileIO(
catalogProperties.getOrDefault(CatalogProperties.FILE_IO_IMPL, DEFAULT_FILE_IO_IMPL),
properties,
hadoopConf,
storageCredentials.stream()
.map(c -> StorageCredential.create(c.prefix(), c.config()))
.collect(Collectors.toList()));
private FileIO scanFileIO(List<Credential> credentials) {
FileIO ioForScan = tableResource.createFileIO(credentials, planId);
FILEIO_TRACKER.put(this, ioForScan);
return ioForScan;
}
Expand All @@ -264,12 +226,12 @@ private CloseableIterable<FileScanTask> fetchPlanningResult() {
id -> {
FetchPlanningResultResponse response =
client.get(
resourcePaths.plan(tableIdentifier, id),
headers,
tableResource.planPath(id),
headers.get(),
FetchPlanningResultResponse.class,
headers,
headers.get(),
ErrorHandlers.planErrorHandler(),
parserContext);
lazyParserContext());

if (response.planStatus() == PlanStatus.SUBMITTED) {
throw new NotCompleteException();
Expand All @@ -292,20 +254,15 @@ private CloseableIterable<FileScanTask> fetchPlanningResult() {

private CloseableIterable<FileScanTask> scanTasksIterable(
List<String> planTasks, List<FileScanTask> fileScanTasks) {
if (planTasks != null && !planTasks.isEmpty()) {
Endpoint.check(supportedEndpoints, Endpoint.V1_FETCH_TABLE_SCAN_PLAN_TASKS);
}

return CloseableIterable.whenComplete(
new ScanTaskIterable(
planTasks,
fileScanTasks == null ? List.of() : fileScanTasks,
client,
resourcePaths,
tableIdentifier,
tableResource.fetchPath(),
headers,
planExecutor(),
parserContext),
lazyParserContext()),
this::cancelPlan);
}

Expand All @@ -319,16 +276,16 @@ private void cleanupPlanResources() {
@VisibleForTesting
@SuppressWarnings("checkstyle:RegexpMultiline")
public boolean cancelPlan() {
if (planId == null || !supportedEndpoints.contains(Endpoint.V1_CANCEL_TABLE_SCAN_PLAN)) {
if (planId == null || !tableResource.supportsCancel()) {
return false;
}

try {
client.delete(
resourcePaths.plan(tableIdentifier, planId),
tableResource.planPath(planId),
Map.of(),
null,
headers,
headers.get(),
ErrorHandlers.planErrorHandler());
this.planId = null;
return true;
Expand Down
19 changes: 8 additions & 11 deletions core/src/main/java/org/apache/iceberg/rest/ScanTaskIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.iceberg.BaseFileScanTask;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.rest.requests.FetchScanTasksRequest;
Expand All @@ -56,18 +56,16 @@ class ScanTaskIterable implements CloseableIterable<FileScanTask> {
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final ExecutorService executorService;
private final RESTClient client;
private final ResourcePaths resourcePaths;
private final TableIdentifier tableIdentifier;
private final Map<String, String> headers;
private final String fetchScanTasksPath;
private final Supplier<Map<String, String>> headers;
private final ParserContext parserContext;

ScanTaskIterable(
List<String> initialPlanTasks,
List<FileScanTask> initialFileScanTasks,
RESTClient client,
ResourcePaths resourcePaths,
TableIdentifier tableIdentifier,
Map<String, String> headers,
String fetchScanTasksPath,
Supplier<Map<String, String>> headers,
ExecutorService executorService,
ParserContext parserContext) {

Expand All @@ -77,8 +75,7 @@ class ScanTaskIterable implements CloseableIterable<FileScanTask> {
this.initialFileScanTasks = new ConcurrentLinkedQueue<>(initialFileScanTasks);

this.client = client;
this.resourcePaths = resourcePaths;
this.tableIdentifier = tableIdentifier;
this.fetchScanTasksPath = fetchScanTasksPath;
this.headers = headers;
this.executorService = executorService;
this.parserContext = parserContext;
Expand Down Expand Up @@ -221,10 +218,10 @@ private FetchScanTasksResponse fetchScanTasks(String planTask) {
FetchScanTasksRequest request = new FetchScanTasksRequest(planTask);

return client.post(
resourcePaths.fetchScanTasks(tableIdentifier),
fetchScanTasksPath,
request,
FetchScanTasksResponse.class,
headers,
headers.get(),
ErrorHandlers.planTaskHandler(),
stringStringMap -> {},
parserContext);
Expand Down
Loading
Loading