|
21 | 21 | import static org.apache.iceberg.MetadataColumns.DELETE_FILE_PATH; |
22 | 22 | import static org.apache.iceberg.MetadataColumns.DELETE_FILE_POS; |
23 | 23 | import static org.assertj.core.api.Assertions.assertThat; |
| 24 | +import static org.assertj.core.api.Assertions.assertThatThrownBy; |
| 25 | +import static org.assertj.core.api.Assumptions.assumeThat; |
24 | 26 |
|
25 | 27 | import java.io.IOException; |
26 | 28 | import java.nio.file.Path; |
27 | 29 | import java.util.Arrays; |
| 30 | +import java.util.Comparator; |
28 | 31 | import java.util.List; |
29 | 32 | import java.util.stream.Collectors; |
30 | 33 | import org.apache.iceberg.DataFile; |
|
38 | 41 | import org.apache.iceberg.encryption.EncryptedFiles; |
39 | 42 | import org.apache.iceberg.encryption.EncryptedOutputFile; |
40 | 43 | import org.apache.iceberg.encryption.EncryptionKeyMetadata; |
| 44 | +import org.apache.iceberg.exceptions.NotFoundException; |
| 45 | +import org.apache.iceberg.exceptions.ValidationException; |
| 46 | +import org.apache.iceberg.expressions.Expression; |
| 47 | +import org.apache.iceberg.expressions.Expressions; |
41 | 48 | import org.apache.iceberg.formats.FileWriterBuilder; |
42 | 49 | import org.apache.iceberg.formats.FormatModelRegistry; |
43 | 50 | import org.apache.iceberg.inmemory.InMemoryFileIO; |
|
46 | 53 | import org.apache.iceberg.io.DeleteSchemaUtil; |
47 | 54 | import org.apache.iceberg.io.InputFile; |
48 | 55 | import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; |
| 56 | +import org.apache.iceberg.types.Types; |
49 | 57 | import org.junit.jupiter.api.AfterEach; |
50 | 58 | import org.junit.jupiter.api.BeforeEach; |
51 | 59 | import org.junit.jupiter.api.io.TempDir; |
@@ -89,7 +97,12 @@ void before() { |
89 | 97 |
|
90 | 98 | @AfterEach |
91 | 99 | void after() { |
92 | | - fileIO.deleteFile(encryptedFile.encryptingOutputFile()); |
| 100 | + try { |
| 101 | + fileIO.deleteFile(encryptedFile.encryptingOutputFile()); |
| 102 | + } catch (NotFoundException ignored) { |
| 103 | + // ignore if file not create |
| 104 | + } |
| 105 | + |
93 | 106 | this.encryptedFile = null; |
94 | 107 | if (fileIO != null) { |
95 | 108 | fileIO.close(); |
@@ -146,25 +159,9 @@ void testDataWriterEngineWriteGenericRead(FileFormat fileFormat, DataGenerator d |
146 | 159 | void testDataWriterGenericWriteEngineRead(FileFormat fileFormat, DataGenerator dataGenerator) |
147 | 160 | throws IOException { |
148 | 161 | Schema schema = dataGenerator.schema(); |
149 | | - FileWriterBuilder<DataWriter<Record>, Object> writerBuilder = |
150 | | - FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile); |
151 | | - |
152 | | - DataWriter<Record> writer = |
153 | | - writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build(); |
154 | 162 |
|
155 | 163 | List<Record> genericRecords = dataGenerator.generateRecords(); |
156 | | - |
157 | | - try (writer) { |
158 | | - for (Record record : genericRecords) { |
159 | | - writer.write(record); |
160 | | - } |
161 | | - } |
162 | | - |
163 | | - DataFile dataFile = writer.toDataFile(); |
164 | | - |
165 | | - assertThat(dataFile).isNotNull(); |
166 | | - assertThat(dataFile.recordCount()).isEqualTo(genericRecords.size()); |
167 | | - assertThat(dataFile.format()).isEqualTo(fileFormat); |
| 164 | + writeGenericRecords(fileFormat, schema, genericRecords); |
168 | 165 |
|
169 | 166 | // Read back and verify |
170 | 167 | InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); |
@@ -317,6 +314,305 @@ void testPositionDeleteWriterEngineWriteGenericRead(FileFormat fileFormat) throw |
317 | 314 | DataTestHelpers.assertEquals(positionDeleteSchema.asStruct(), records, readRecords); |
318 | 315 | } |
319 | 316 |
|
| 317 | + @ParameterizedTest |
| 318 | + @FieldSource("FORMAT_AND_GENERATOR") |
| 319 | + /** Write with Generic Record, read with projected engine type T (narrow schema) */ |
| 320 | + void testReaderBuilderProjection(FileFormat fileFormat, DataGenerator dataGenerator) |
| 321 | + throws IOException { |
| 322 | + Schema fullSchema = dataGenerator.schema(); |
| 323 | + |
| 324 | + List<Types.NestedField> columns = fullSchema.columns(); |
| 325 | + Schema projectedSchema = new Schema(columns.get(columns.size() - 1)); |
| 326 | + |
| 327 | + List<Record> genericRecords = dataGenerator.generateRecords(); |
| 328 | + writeGenericRecords(fileFormat, fullSchema, genericRecords); |
| 329 | + |
| 330 | + List<Record> projectedGenericRecords = projectRecords(genericRecords, projectedSchema); |
| 331 | + List<T> expectedEngineRecords = |
| 332 | + convertToEngineRecords(projectedGenericRecords, projectedSchema); |
| 333 | + |
| 334 | + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); |
| 335 | + List<T> readRecords; |
| 336 | + try (CloseableIterable<T> reader = |
| 337 | + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) |
| 338 | + .project(projectedSchema) |
| 339 | + .engineProjection(engineSchema(projectedSchema)) |
| 340 | + .build()) { |
| 341 | + readRecords = ImmutableList.copyOf(reader); |
| 342 | + } |
| 343 | + |
| 344 | + assertEquals(projectedSchema, expectedEngineRecords, readRecords); |
| 345 | + } |
| 346 | + |
| 347 | + @ParameterizedTest |
| 348 | + @FieldSource("FORMAT_AND_GENERATOR") |
| 349 | + void testReaderBuilderFilter(FileFormat fileFormat, DataGenerator dataGenerator) |
| 350 | + throws IOException { |
| 351 | + |
| 352 | + // Avro does not support filter push down |
| 353 | + // Skip this test for Avro to avoid false failures. |
| 354 | + assumeThat(fileFormat != FileFormat.AVRO).isTrue(); |
| 355 | + |
| 356 | + Schema schema = dataGenerator.schema(); |
| 357 | + |
| 358 | + List<Record> genericRecords = dataGenerator.generateRecords(); |
| 359 | + writeGenericRecords(fileFormat, schema, genericRecords); |
| 360 | + |
| 361 | + // Construct a filter condition that is smaller than the minimum value to achieve file-level |
| 362 | + // filtering. |
| 363 | + Types.NestedField firstField = schema.columns().get(0); |
| 364 | + Expression filter = filterFieldExpression(firstField, schema, genericRecords); |
| 365 | + |
| 366 | + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); |
| 367 | + List<T> readRecords; |
| 368 | + try (CloseableIterable<T> reader = |
| 369 | + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) |
| 370 | + .project(schema) |
| 371 | + .engineProjection(engineSchema(schema)) |
| 372 | + .filter(filter) |
| 373 | + .build()) { |
| 374 | + readRecords = ImmutableList.copyOf(reader); |
| 375 | + } |
| 376 | + |
| 377 | + assertThat(readRecords).isEmpty(); |
| 378 | + } |
| 379 | + |
| 380 | + @ParameterizedTest |
| 381 | + @FieldSource("FORMAT_AND_GENERATOR") |
| 382 | + /** |
| 383 | + * Write with Generic Record, then read using an upper-cased column name in the filter to verify |
| 384 | + * caseSensitive behavior. |
| 385 | + */ |
| 386 | + void testReaderBuilderCaseSensitive(FileFormat fileFormat, DataGenerator dataGenerator) |
| 387 | + throws IOException { |
| 388 | + |
| 389 | + // Avro does not support filter push down; caseSensitive has no effect on it. |
| 390 | + // Skip this test for Avro to avoid false failures. |
| 391 | + assumeThat(fileFormat != FileFormat.AVRO).isTrue(); |
| 392 | + |
| 393 | + Schema schema = dataGenerator.schema(); |
| 394 | + |
| 395 | + List<Record> genericRecords = dataGenerator.generateRecords(); |
| 396 | + writeGenericRecords(fileFormat, schema, genericRecords); |
| 397 | + |
| 398 | + // Build a filter using the upper-cased name of the first column. |
| 399 | + Types.NestedField firstField = schema.columns().get(0); |
| 400 | + Object filterValue = genericRecords.get(0).getField(firstField.name()); |
| 401 | + Expression upperCaseFilter = Expressions.equal(firstField.name().toUpperCase(), filterValue); |
| 402 | + |
| 403 | + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); |
| 404 | + |
| 405 | + // caseSensitive=false: upper-cased column name must be resolved correctly. |
| 406 | + List<T> readRecords; |
| 407 | + try (CloseableIterable<T> reader = |
| 408 | + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) |
| 409 | + .project(schema) |
| 410 | + .engineProjection(engineSchema(schema)) |
| 411 | + .filter(upperCaseFilter) |
| 412 | + .caseSensitive(false) |
| 413 | + .build()) { |
| 414 | + readRecords = ImmutableList.copyOf(reader); |
| 415 | + } |
| 416 | + |
| 417 | + assertThat(readRecords).isNotEmpty(); |
| 418 | + |
| 419 | + // caseSensitive=true: upper-cased column name cannot be resolved → must throw. |
| 420 | + assertThatThrownBy( |
| 421 | + () -> { |
| 422 | + try (CloseableIterable<T> reader = |
| 423 | + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) |
| 424 | + .project(schema) |
| 425 | + .engineProjection(engineSchema(schema)) |
| 426 | + .filter(upperCaseFilter) |
| 427 | + .caseSensitive(true) |
| 428 | + .build()) { |
| 429 | + ImmutableList.copyOf(reader); |
| 430 | + } |
| 431 | + }) |
| 432 | + .isInstanceOf(ValidationException.class); |
| 433 | + } |
| 434 | + |
| 435 | + @ParameterizedTest |
| 436 | + @FieldSource("FORMAT_AND_GENERATOR") |
| 437 | + /** |
| 438 | + * Write with Generic Record, then read using split to verify that the split range is respected. |
| 439 | + * Reading with a zero-length split at the end of the file should return no records, while reading |
| 440 | + * with the full file range should return all records. |
| 441 | + */ |
| 442 | + void testReaderBuilderSplit(FileFormat fileFormat, DataGenerator dataGenerator) |
| 443 | + throws IOException { |
| 444 | + Schema schema = dataGenerator.schema(); |
| 445 | + |
| 446 | + List<Record> genericRecords = dataGenerator.generateRecords(); |
| 447 | + writeGenericRecords(fileFormat, schema, genericRecords); |
| 448 | + |
| 449 | + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); |
| 450 | + long fileLength = inputFile.getLength(); |
| 451 | + |
| 452 | + // split(fileLength, 0): empty range at the end of the file → no records should be returned |
| 453 | + List<T> emptyReadRecords; |
| 454 | + try (CloseableIterable<T> reader = |
| 455 | + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) |
| 456 | + .project(schema) |
| 457 | + .engineProjection(engineSchema(schema)) |
| 458 | + .split(fileLength, 0) |
| 459 | + .build()) { |
| 460 | + emptyReadRecords = ImmutableList.copyOf(reader); |
| 461 | + } |
| 462 | + |
| 463 | + assertThat(emptyReadRecords).isEmpty(); |
| 464 | + |
| 465 | + // split(0, fileLength): full file range → all records should be returned |
| 466 | + List<T> fullReadRecords; |
| 467 | + try (CloseableIterable<T> reader = |
| 468 | + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) |
| 469 | + .project(schema) |
| 470 | + .engineProjection(engineSchema(schema)) |
| 471 | + .split(0, fileLength) |
| 472 | + .build()) { |
| 473 | + fullReadRecords = ImmutableList.copyOf(reader); |
| 474 | + } |
| 475 | + |
| 476 | + assertEquals(schema, convertToEngineRecords(genericRecords, schema), fullReadRecords); |
| 477 | + } |
| 478 | + |
| 479 | + @ParameterizedTest |
| 480 | + @FieldSource("FORMAT_AND_GENERATOR") |
| 481 | + /** |
| 482 | + * Verifies the contract of recordsPerBatch: recordsPerBatch is a hint for vectorized readers. The |
| 483 | + * total number of records returned must be unaffected regardless of the batch size value. |
| 484 | + */ |
| 485 | + void testReaderBuilderRecordsPerBatch(FileFormat fileFormat, DataGenerator dataGenerator) |
| 486 | + throws IOException { |
| 487 | + |
| 488 | + // Avro does not support batch reading. |
| 489 | + assumeThat(fileFormat != FileFormat.AVRO).isTrue(); |
| 490 | + |
| 491 | + Schema schema = dataGenerator.schema(); |
| 492 | + |
| 493 | + List<Record> genericRecords = dataGenerator.generateRecords(); |
| 494 | + writeGenericRecords(fileFormat, schema, genericRecords); |
| 495 | + |
| 496 | + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); |
| 497 | + List<T> expectedEngineRecords = convertToEngineRecords(genericRecords, schema); |
| 498 | + |
| 499 | + List<T> smallBatchRecords; |
| 500 | + try (CloseableIterable<T> reader = |
| 501 | + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) |
| 502 | + .project(schema) |
| 503 | + .engineProjection(engineSchema(schema)) |
| 504 | + .recordsPerBatch(1) |
| 505 | + .build()) { |
| 506 | + smallBatchRecords = ImmutableList.copyOf(reader); |
| 507 | + } |
| 508 | + |
| 509 | + assertEquals(schema, expectedEngineRecords, smallBatchRecords); |
| 510 | + |
| 511 | + List<T> largeBatchRecords; |
| 512 | + try (CloseableIterable<T> reader = |
| 513 | + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) |
| 514 | + .project(schema) |
| 515 | + .engineProjection(engineSchema(schema)) |
| 516 | + .recordsPerBatch(genericRecords.size() + 1) |
| 517 | + .build()) { |
| 518 | + largeBatchRecords = ImmutableList.copyOf(reader); |
| 519 | + } |
| 520 | + |
| 521 | + assertEquals(schema, expectedEngineRecords, largeBatchRecords); |
| 522 | + } |
| 523 | + |
| 524 | + @ParameterizedTest |
| 525 | + @FieldSource("FORMAT_AND_GENERATOR") |
| 526 | + /** Verifies the contract of reuseContainers */ |
| 527 | + void testReaderBuilderReuseContainers(FileFormat fileFormat, DataGenerator dataGenerator) |
| 528 | + throws IOException { |
| 529 | + |
| 530 | + // Orc does not support batch reading. |
| 531 | + assumeThat(fileFormat != FileFormat.ORC).isTrue(); |
| 532 | + |
| 533 | + Schema schema = dataGenerator.schema(); |
| 534 | + |
| 535 | + List<Record> genericRecords = dataGenerator.generateRecords(); |
| 536 | + // Need at least 2 records to verify container reuse |
| 537 | + assumeThat(genericRecords.size() >= 2).isTrue(); |
| 538 | + writeGenericRecords(fileFormat, schema, genericRecords); |
| 539 | + |
| 540 | + InputFile inputFile = encryptedFile.encryptingOutputFile().toInputFile(); |
| 541 | + |
| 542 | + // Without reuseContainers: every record must be a distinct object instance |
| 543 | + List<T> noReuseRecords; |
| 544 | + try (CloseableIterable<T> reader = |
| 545 | + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) |
| 546 | + .project(schema) |
| 547 | + .engineProjection(engineSchema(schema)) |
| 548 | + .build()) { |
| 549 | + noReuseRecords = ImmutableList.copyOf(reader); |
| 550 | + } |
| 551 | + |
| 552 | + for (int i = 0; i < noReuseRecords.size() - 1; i++) { |
| 553 | + assertThat(noReuseRecords.get(i)).isNotSameAs(noReuseRecords.get(i + 1)); |
| 554 | + } |
| 555 | + |
| 556 | + // With reuseContainers: all collected elements must be the same object instance |
| 557 | + List<T> reuseRecords; |
| 558 | + try (CloseableIterable<T> reader = |
| 559 | + FormatModelRegistry.readBuilder(fileFormat, engineType(), inputFile) |
| 560 | + .project(schema) |
| 561 | + .engineProjection(engineSchema(schema)) |
| 562 | + .reuseContainers() |
| 563 | + .build()) { |
| 564 | + reuseRecords = ImmutableList.copyOf(reader); |
| 565 | + } |
| 566 | + |
| 567 | + T first = reuseRecords.get(0); |
| 568 | + for (int i = 1; i < reuseRecords.size(); i++) { |
| 569 | + assertThat(reuseRecords.get(i)).isSameAs(first); |
| 570 | + } |
| 571 | + } |
| 572 | + |
| 573 | + private void writeGenericRecords(FileFormat fileFormat, Schema schema, List<Record> records) |
| 574 | + throws IOException { |
| 575 | + FileWriterBuilder<DataWriter<Record>, Object> writerBuilder = |
| 576 | + FormatModelRegistry.dataWriteBuilder(fileFormat, Record.class, encryptedFile); |
| 577 | + |
| 578 | + DataWriter<Record> writer = |
| 579 | + writerBuilder.schema(schema).spec(PartitionSpec.unpartitioned()).build(); |
| 580 | + |
| 581 | + try (writer) { |
| 582 | + for (Record record : records) { |
| 583 | + writer.write(record); |
| 584 | + } |
| 585 | + } |
| 586 | + |
| 587 | + DataFile dataFile = writer.toDataFile(); |
| 588 | + assertThat(dataFile).isNotNull(); |
| 589 | + assertThat(dataFile.recordCount()).isEqualTo(records.size()); |
| 590 | + assertThat(dataFile.format()).isEqualTo(fileFormat); |
| 591 | + } |
| 592 | + |
| 593 | + private List<Record> projectRecords(List<Record> records, Schema projectedSchema) { |
| 594 | + return records.stream() |
| 595 | + .map( |
| 596 | + record -> { |
| 597 | + Record projected = GenericRecord.create(projectedSchema.asStruct()); |
| 598 | + for (Types.NestedField field : projectedSchema.columns()) { |
| 599 | + projected.setField(field.name(), record.getField(field.name())); |
| 600 | + } |
| 601 | + return projected; |
| 602 | + }) |
| 603 | + .collect(Collectors.toList()); |
| 604 | + } |
| 605 | + |
| 606 | + private Expression filterFieldExpression( |
| 607 | + Types.NestedField firstField, Schema schema, List<Record> records) { |
| 608 | + Object minValue = |
| 609 | + records.stream() |
| 610 | + .map(r -> (Comparable) r.getField(firstField.name())) |
| 611 | + .min(Comparator.naturalOrder()) |
| 612 | + .get(); |
| 613 | + return Expressions.lessThan(firstField.name(), minValue); |
| 614 | + } |
| 615 | + |
320 | 616 | private List<T> convertToEngineRecords(List<Record> records, Schema schema) { |
321 | 617 | return records.stream().map(r -> convertToEngine(r, schema)).collect(Collectors.toList()); |
322 | 618 | } |
|
0 commit comments