Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
import static org.junit.jupiter.api.Assumptions.assumeFalse;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -72,6 +73,10 @@ public abstract class BaseFormatModelTests<T> {

protected abstract void assertEquals(Schema schema, List<T> expected, List<T> actual);

protected boolean supportsBatchReads() {
return false;
}

private static final FileFormat[] FILE_FORMATS =
new FileFormat[] {FileFormat.AVRO, FileFormat.PARQUET, FileFormat.ORC};

Expand All @@ -85,16 +90,13 @@ public abstract class BaseFormatModelTests<T> {

static final String FEATURE_FILTER = "filter";
static final String FEATURE_CASE_SENSITIVE = "caseSensitive";
static final String FEATURE_RECORDS_PER_BATCH = "recordsPerBatch";
static final String FEATURE_SPLIT = "split";
static final String FEATURE_REUSE_CONTAINERS = "reuseContainers";

private static final Map<FileFormat, String[]> MISSING_FEATURES =
Map.of(
FileFormat.AVRO,
new String[] {
FEATURE_FILTER, FEATURE_CASE_SENSITIVE, FEATURE_RECORDS_PER_BATCH, FEATURE_SPLIT
},
new String[] {FEATURE_FILTER, FEATURE_CASE_SENSITIVE, FEATURE_SPLIT},
FileFormat.ORC,
new String[] {FEATURE_REUSE_CONTAINERS});

Expand Down Expand Up @@ -607,6 +609,25 @@ void testReaderBuilderReuseContainers(FileFormat fileFormat) throws IOException
reuseRecords.forEach(r -> assertThat(r).isSameAs(reuseRecords.get(0)));
}

@ParameterizedTest
@FieldSource("FILE_FORMATS")
void testReaderBuilderRecordsPerBatchNotSupported(FileFormat fileFormat) throws IOException {
assumeFalse(supportsBatchReads(), engineType().getSimpleName() + " supports batch reads");

DataGenerator dataGenerator = new DataGenerators.DefaultSchema();
Schema schema = dataGenerator.schema();
List<Record> genericRecords = dataGenerator.generateRecords();
writeGenericRecords(fileFormat, schema, genericRecords);

InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
assertThatThrownBy(
() ->
FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile)
.recordsPerBatch(100))
.hasMessageContaining("Batch reading is not supported")
.isInstanceOf(UnsupportedOperationException.class);
}

private void readAndAssertGenericRecords(
FileFormat fileFormat, Schema schema, List<Record> expected) throws IOException {
InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile();
Expand Down
5 changes: 5 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/ORCFormatModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,11 @@ public ReadBuilder<D, S> reuseContainers() {

@Override
public ReadBuilder<D, S> recordsPerBatch(int numRowsPerBatch) {
if (!isBatchReader) {
throw new UnsupportedOperationException(
"Batch reading is not supported in non-vectorized reader");
}

internal.recordsPerBatch(numRowsPerBatch);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ public ReadBuilder<D, S> reuseContainers() {

@Override
public ReadBuilder<D, S> recordsPerBatch(int numRowsPerBatch) {
if (!isBatchReader) {
throw new UnsupportedOperationException(
"Batch reading is not supported in non-vectorized reader");
}

internal.recordsPerBatch(numRowsPerBatch);
return this;
}
Expand Down
Loading