diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java index 3f35135f5125..60d126be21f8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatWriter.java @@ -20,6 +20,8 @@ import org.apache.paimon.data.InternalRow; +import javax.annotation.Nullable; + import java.io.Closeable; import java.io.IOException; @@ -49,4 +51,19 @@ public interface FormatWriter extends Closeable { * @throws IOException Thrown if calculating the length fails. */ boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException; + + /** + * Returns format-specific writer metadata that can be used to extract statistics without + * re-reading the file. This is useful for object stores (like OSS/S3) where the file may not be + * immediately visible after close. + * + *

This method should only be called after {@link #close()}. By default, returns {@code null} + * indicating no in-memory metadata is available. + * + * @return format-specific metadata object, or {@code null} if not available. + */ + @Nullable + default Object writerMetadata() { + return null; + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/format/SimpleStatsExtractor.java b/paimon-common/src/main/java/org/apache/paimon/format/SimpleStatsExtractor.java index 9ffbf495e5bf..41ff84dd7c89 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/SimpleStatsExtractor.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/SimpleStatsExtractor.java @@ -22,6 +22,8 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.utils.Pair; +import javax.annotation.Nullable; + import java.io.IOException; /** Extracts statistics directly from file. */ @@ -29,6 +31,24 @@ public interface SimpleStatsExtractor { SimpleColStats[] extract(FileIO fileIO, Path path, long length) throws IOException; + /** + * Extract statistics using optional in-memory writer metadata to avoid re-reading the file. + * When writerMetadata is non-null and the extractor supports it, the stats will be extracted + * from memory instead of from the file. This avoids issues with object stores where the file + * may not be immediately visible after close. + * + * @param fileIO the file IO + * @param path the file path + * @param length the file length + * @param writerMetadata optional format-specific metadata from the writer, or null + * @return column statistics + */ + default SimpleColStats[] extract( + FileIO fileIO, Path path, long length, @Nullable Object writerMetadata) + throws IOException { + return extract(fileIO, path, length); + } + Pair extractWithFileInfo(FileIO fileIO, Path path, long length) throws IOException; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/SimpleStatsProducer.java b/paimon-core/src/main/java/org/apache/paimon/io/SimpleStatsProducer.java index 1e818ab9d099..e33dcfc46a8f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/SimpleStatsProducer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/SimpleStatsProducer.java @@ -40,6 +40,17 @@ public interface SimpleStatsProducer { SimpleColStats[] extract(FileIO fileIO, Path path, long length) throws IOException; + /** + * Extract statistics using optional in-memory writer metadata. + * + * @param writerMetadata optional format-specific metadata from the writer, or null + */ + default SimpleColStats[] extract( + FileIO fileIO, Path path, long length, @Nullable Object writerMetadata) + throws IOException { + return extract(fileIO, path, length); + } + static SimpleStatsProducer disabledProducer() { return new SimpleStatsProducer() { @@ -92,6 +103,13 @@ public SimpleColStats[] extract(FileIO fileIO, Path path, long length) throws IOException { return extractor.extract(fileIO, path, length); } + + @Override + public SimpleColStats[] extract( + FileIO fileIO, Path path, long length, @Nullable Object writerMetadata) + throws IOException { + return extractor.extract(fileIO, path, length, writerMetadata); + } }; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java index aea24be3cac0..473acecea9ed 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java @@ -59,6 +59,7 @@ public abstract class SingleFileWriter implements FileWriter { @Nullable private PositionOutputStream out; @Nullable private Long outputBytes; + @Nullable private Object writerMetadata; private long recordCount; protected boolean closed; @@ -193,6 +194,7 @@ public void close() throws IOException { try { if (writer != null) { writer.close(); + writerMetadata = writer.writerMetadata(); writer = null; } if (out != null) { @@ -216,4 +218,13 @@ protected long outputBytes() throws IOException { } return outputBytes; } + + /** + * Returns cached writer metadata from the format writer. Available after {@link #close()} is + * called. Can be used by stats extractors to avoid re-reading the file from object storage. + */ + @Nullable + protected Object writerMetadata() { + return writerMetadata; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java index 20cebfcb271d..773ca08a815f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/StatsCollectingSingleFileWriter.java @@ -83,6 +83,6 @@ public SimpleColStats[] fieldStats(long fileSize) throws IOException { .toArray(SimpleColStats[]::new); } - return statsProducer.extract(fileIO, path, fileSize); + return statsProducer.extract(fileIO, path, fileSize, writerMetadata()); } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java index a265b76d4c31..f286cfce5dfe 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSimpleStatsExtractor.java @@ -42,8 +42,11 @@ import org.apache.parquet.column.statistics.IntStatistics; import org.apache.parquet.column.statistics.LongStatistics; import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.PrimitiveType; +import javax.annotation.Nullable; + import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; @@ -74,6 +77,29 @@ public SimpleColStats[] extract(FileIO fileIO, Path path, long length) throws IO return extractWithFileInfo(fileIO, path, length).getLeft(); } + @Override + public SimpleColStats[] extract( + FileIO fileIO, Path path, long length, @Nullable Object writerMetadata) + throws IOException { + if (writerMetadata instanceof ParquetMetadata) { + // Use in-memory metadata directly, avoiding re-reading the file. + // This is critical for object stores (OSS/S3) where the file may not be + // immediately visible after close. + Map> columnStats = + ParquetUtil.extractColumnStats((ParquetMetadata) writerMetadata); + SimpleColStatsCollector[] collectors = SimpleColStatsCollector.create(statsCollectors); + return IntStream.range(0, rowType.getFieldCount()) + .mapToObj( + i -> { + DataField field = rowType.getFields().get(i); + return toFieldStats( + field, columnStats.get(field.name()), collectors[i]); + }) + .toArray(SimpleColStats[]::new); + } + return extract(fileIO, path, length); + } + @Override public Pair extractWithFileInfo( FileIO fileIO, Path path, long length) throws IOException { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java index f3c358c37571..575252095d3d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java @@ -54,25 +54,38 @@ public class ParquetUtil { throws IOException { try (ParquetFileReader reader = getParquetReader(fileIO, path, length, options)) { ParquetMetadata parquetMetadata = reader.getFooter(); - List blockMetaDataList = parquetMetadata.getBlocks(); - Map> resultStats = new HashMap<>(); - for (BlockMetaData blockMetaData : blockMetaDataList) { - List columnChunkMetaDataList = blockMetaData.getColumns(); - for (ColumnChunkMetaData columnChunkMetaData : columnChunkMetaDataList) { - Statistics stats = columnChunkMetaData.getStatistics(); - String columnName = columnChunkMetaData.getPath().toDotString(); - Statistics midStats; - if (!resultStats.containsKey(columnName)) { - midStats = stats; - } else { - midStats = resultStats.get(columnName); - midStats.mergeStatistics(stats); - } - resultStats.put(columnName, midStats); + Map> resultStats = extractColumnStats(parquetMetadata); + return Pair.of(resultStats, new SimpleStatsExtractor.FileInfo(reader.getRecordCount())); + } + } + + /** + * Extract column stats from in-memory {@link ParquetMetadata}. This avoids re-reading the file + * from storage, which is critical for object stores (like OSS/S3) where the file may not be + * immediately visible after close. + * + * @param parquetMetadata the in-memory Parquet metadata (footer) + * @return result sets as map, key is column name, value is statistics + */ + public static Map> extractColumnStats(ParquetMetadata parquetMetadata) { + List blockMetaDataList = parquetMetadata.getBlocks(); + Map> resultStats = new HashMap<>(); + for (BlockMetaData blockMetaData : blockMetaDataList) { + List columnChunkMetaDataList = blockMetaData.getColumns(); + for (ColumnChunkMetaData columnChunkMetaData : columnChunkMetaDataList) { + Statistics stats = columnChunkMetaData.getStatistics(); + String columnName = columnChunkMetaData.getPath().toDotString(); + Statistics midStats; + if (!resultStats.containsKey(columnName)) { + midStats = stats; + } else { + midStats = resultStats.get(columnName); + midStats.mergeStatistics(stats); } + resultStats.put(columnName, midStats); } - return Pair.of(resultStats, new SimpleStatsExtractor.FileInfo(reader.getRecordCount())); } + return resultStats; } /** diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java index ac11a2ef002d..d7282f699f1e 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/writer/ParquetBulkWriter.java @@ -22,6 +22,9 @@ import org.apache.paimon.format.FormatWriter; import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; + +import javax.annotation.Nullable; import java.io.IOException; @@ -33,6 +36,9 @@ public class ParquetBulkWriter implements FormatWriter { /** The ParquetWriter to write to. */ private final ParquetWriter parquetWriter; + /** Cached footer metadata after close, used to avoid re-reading the file for stats. */ + @Nullable private ParquetMetadata footerMetadata; + /** * Creates a new ParquetBulkWriter wrapping the given ParquetWriter. * @@ -50,10 +56,17 @@ public void addElement(InternalRow datum) throws IOException { @Override public void close() throws IOException { parquetWriter.close(); + this.footerMetadata = parquetWriter.getFooter(); } @Override public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException { return suggestedCheck && parquetWriter.getDataSize() >= targetSize; } + + @Nullable + @Override + public Object writerMetadata() { + return footerMetadata; + } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetInMemoryStatsTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetInMemoryStatsTest.java new file mode 100644 index 000000000000..f0d8d34f43b5 --- /dev/null +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetInMemoryStatsTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.format.parquet; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.format.FileFormat; +import org.apache.paimon.format.FormatWriter; +import org.apache.paimon.format.FormatWriterFactory; +import org.apache.paimon.format.SimpleColStats; +import org.apache.paimon.format.SimpleStatsExtractor; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.PositionOutputStream; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.options.Options; +import org.apache.paimon.statistics.SimpleColStatsCollector; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.BooleanType; +import org.apache.paimon.types.DateType; +import org.apache.paimon.types.DecimalType; +import org.apache.paimon.types.DoubleType; +import org.apache.paimon.types.FloatType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.types.SmallIntType; +import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.types.VarCharType; + +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.math.BigDecimal; +import java.util.stream.IntStream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests that verify extracting column stats from in-memory ParquetMetadata produces the same + * results as extracting from the file. This is critical for object stores (like OSS/S3) where the + * file may not be immediately visible after close. + */ +public class ParquetInMemoryStatsTest { + + @TempDir java.nio.file.Path tempDir; + + private final FileIO fileIO = new LocalFileIO(); + + @Test + public void testInMemoryStatsMatchFileStats() throws Exception { + RowType rowType = + RowType.builder() + .fields( + new VarCharType(100), + new BooleanType(), + new TinyIntType(), + new SmallIntType(), + new IntType(), + new BigIntType(), + new FloatType(), + new DoubleType(), + new DecimalType(10, 2), + new DateType()) + .build(); + + FileFormat format = FileFormat.fromIdentifier("parquet", new Options()); + FormatWriterFactory writerFactory = format.createWriterFactory(rowType); + Path path = new Path(tempDir.toString() + "/test_inmemory_stats.parquet"); + + // Write test data + PositionOutputStream out = fileIO.newOutputStream(path, false); + FormatWriter writer = writerFactory.create(out, "SNAPPY"); + + for (int i = 0; i < 100; i++) { + GenericRow row = new GenericRow(10); + row.setField(0, BinaryString.fromString("value_" + i)); + row.setField(1, i % 2 == 0); + row.setField(2, (byte) (i % 127)); + row.setField(3, (short) (i * 10)); + row.setField(4, i * 100); + row.setField(5, (long) i * 1000); + // Start from 1 to avoid -0.0 vs 0.0 IEEE 754 representation difference + // between in-memory and serialized Parquet statistics + row.setField(6, (i + 1) * 1.1f); + row.setField(7, (i + 1) * 2.2); + row.setField(8, Decimal.fromBigDecimal(new BigDecimal(i + ".99"), 10, 2)); + row.setField(9, 18000 + i); // date as days since epoch + writer.addElement(row); + } + + // Also add some nulls + for (int i = 0; i < 10; i++) { + GenericRow row = new GenericRow(10); + // Leave all fields null + writer.addElement(row); + } + + writer.close(); + out.close(); + + // Get in-memory metadata from writer + Object writerMetadata = writer.writerMetadata(); + assertThat(writerMetadata).isNotNull(); + assertThat(writerMetadata).isInstanceOf(ParquetMetadata.class); + + // Create stats extractors + int fieldCount = rowType.getFieldCount(); + SimpleColStatsCollector.Factory[] statsCollectors = + IntStream.range(0, fieldCount) + .mapToObj(p -> SimpleColStatsCollector.from("full")) + .toArray(SimpleColStatsCollector.Factory[]::new); + SimpleStatsExtractor extractor = + format.createStatsExtractor(rowType, statsCollectors).get(); + + // Extract stats from file (original path) + long fileSize = fileIO.getFileSize(path); + SimpleColStats[] fromFile = extractor.extract(fileIO, path, fileSize); + + // Extract stats from in-memory metadata (new path) + SimpleColStats[] fromMemory = extractor.extract(fileIO, path, fileSize, writerMetadata); + + // They should be exactly the same + assertThat(fromMemory).isEqualTo(fromFile); + + // Also verify the stats are correct + assertThat(fromFile[0].min()).isEqualTo(BinaryString.fromString("value_0")); + assertThat(fromFile[0].max()).isEqualTo(BinaryString.fromString("value_99")); + assertThat(fromFile[0].nullCount()).isEqualTo(10L); + + assertThat(fromFile[4].min()).isEqualTo(0); + assertThat(fromFile[4].max()).isEqualTo(9900); + assertThat(fromFile[4].nullCount()).isEqualTo(10L); + } + + @Test + public void testInMemoryStatsFallbackWhenMetadataIsNull() throws Exception { + RowType rowType = RowType.builder().fields(new IntType(), new VarCharType(50)).build(); + + FileFormat format = FileFormat.fromIdentifier("parquet", new Options()); + FormatWriterFactory writerFactory = format.createWriterFactory(rowType); + Path path = new Path(tempDir.toString() + "/test_fallback.parquet"); + + PositionOutputStream out = fileIO.newOutputStream(path, false); + FormatWriter writer = writerFactory.create(out, "SNAPPY"); + + for (int i = 0; i < 10; i++) { + GenericRow row = new GenericRow(2); + row.setField(0, i); + row.setField(1, BinaryString.fromString("text_" + i)); + writer.addElement(row); + } + writer.close(); + out.close(); + + int fieldCount = rowType.getFieldCount(); + SimpleColStatsCollector.Factory[] statsCollectors = + IntStream.range(0, fieldCount) + .mapToObj(p -> SimpleColStatsCollector.from("full")) + .toArray(SimpleColStatsCollector.Factory[]::new); + SimpleStatsExtractor extractor = + format.createStatsExtractor(rowType, statsCollectors).get(); + + long fileSize = fileIO.getFileSize(path); + + // Extract with null metadata (should fall back to file reading) + SimpleColStats[] fromFallback = extractor.extract(fileIO, path, fileSize, null); + + // Extract from file directly + SimpleColStats[] fromFile = extractor.extract(fileIO, path, fileSize); + + // Results should be identical + assertThat(fromFallback).isEqualTo(fromFile); + } + + @Test + public void testWriterMetadataDefaultIsNull() throws Exception { + // A non-Parquet FormatWriter (or any FormatWriter that doesn't override writerMetadata()) + // should return null by default + FormatWriter mockWriter = + new FormatWriter() { + @Override + public void addElement(org.apache.paimon.data.InternalRow element) {} + + @Override + public boolean reachTargetSize(boolean suggestedCheck, long targetSize) { + return false; + } + + @Override + public void close() {} + }; + + assertThat(mockWriter.writerMetadata()).isNull(); + } +}