Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.paimon.data.InternalRow;

import javax.annotation.Nullable;

import java.io.Closeable;
import java.io.IOException;

Expand Down Expand Up @@ -49,4 +51,19 @@ public interface FormatWriter extends Closeable {
* @throws IOException Thrown if calculating the length fails.
*/
boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException;

/**
* Returns format-specific writer metadata that can be used to extract statistics without
* re-reading the file. This is useful for object stores (like OSS/S3) where the file may not be
* immediately visible after close.
*
* <p>This method should only be called after {@link #close()}. By default, returns {@code null}
* indicating no in-memory metadata is available.
*
* @return format-specific metadata object, or {@code null} if not available.
*/
@Nullable
default Object writerMetadata() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,33 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.Pair;

import javax.annotation.Nullable;

import java.io.IOException;

/** Extracts statistics directly from file. */
public interface SimpleStatsExtractor {

SimpleColStats[] extract(FileIO fileIO, Path path, long length) throws IOException;

/**
* Extract statistics using optional in-memory writer metadata to avoid re-reading the file.
* When writerMetadata is non-null and the extractor supports it, the stats will be extracted
* from memory instead of from the file. This avoids issues with object stores where the file
* may not be immediately visible after close.
*
* @param fileIO the file IO
* @param path the file path
* @param length the file length
* @param writerMetadata optional format-specific metadata from the writer, or null
* @return column statistics
*/
default SimpleColStats[] extract(
FileIO fileIO, Path path, long length, @Nullable Object writerMetadata)
throws IOException {
return extract(fileIO, path, length);
}

Pair<SimpleColStats[], FileInfo> extractWithFileInfo(FileIO fileIO, Path path, long length)
throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ public interface SimpleStatsProducer {

SimpleColStats[] extract(FileIO fileIO, Path path, long length) throws IOException;

/**
* Extract statistics using optional in-memory writer metadata.
*
* @param writerMetadata optional format-specific metadata from the writer, or null
*/
default SimpleColStats[] extract(
FileIO fileIO, Path path, long length, @Nullable Object writerMetadata)
throws IOException {
return extract(fileIO, path, length);
}

static SimpleStatsProducer disabledProducer() {
return new SimpleStatsProducer() {

Expand Down Expand Up @@ -92,6 +103,13 @@ public SimpleColStats[] extract(FileIO fileIO, Path path, long length)
throws IOException {
return extractor.extract(fileIO, path, length);
}

@Override
public SimpleColStats[] extract(
FileIO fileIO, Path path, long length, @Nullable Object writerMetadata)
throws IOException {
return extractor.extract(fileIO, path, length, writerMetadata);
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public abstract class SingleFileWriter<T, R> implements FileWriter<T, R> {
@Nullable private PositionOutputStream out;

@Nullable private Long outputBytes;
@Nullable private Object writerMetadata;
private long recordCount;
protected boolean closed;

Expand Down Expand Up @@ -193,6 +194,7 @@ public void close() throws IOException {
try {
if (writer != null) {
writer.close();
writerMetadata = writer.writerMetadata();
writer = null;
}
if (out != null) {
Expand All @@ -216,4 +218,13 @@ protected long outputBytes() throws IOException {
}
return outputBytes;
}

/**
* Returns cached writer metadata from the format writer. Available after {@link #close()} is
* called. Can be used by stats extractors to avoid re-reading the file from object storage.
*/
@Nullable
protected Object writerMetadata() {
return writerMetadata;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,6 @@ public SimpleColStats[] fieldStats(long fileSize) throws IOException {
.toArray(SimpleColStats[]::new);
}

return statsProducer.extract(fileIO, path, fileSize);
return statsProducer.extract(fileIO, path, fileSize, writerMetadata());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@
import org.apache.parquet.column.statistics.IntStatistics;
import org.apache.parquet.column.statistics.LongStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.PrimitiveType;

import javax.annotation.Nullable;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
Expand Down Expand Up @@ -74,6 +77,29 @@ public SimpleColStats[] extract(FileIO fileIO, Path path, long length) throws IO
return extractWithFileInfo(fileIO, path, length).getLeft();
}

@Override
public SimpleColStats[] extract(
FileIO fileIO, Path path, long length, @Nullable Object writerMetadata)
throws IOException {
if (writerMetadata instanceof ParquetMetadata) {
// Use in-memory metadata directly, avoiding re-reading the file.
// This is critical for object stores (OSS/S3) where the file may not be
// immediately visible after close.
Map<String, Statistics<?>> columnStats =
ParquetUtil.extractColumnStats((ParquetMetadata) writerMetadata);
SimpleColStatsCollector[] collectors = SimpleColStatsCollector.create(statsCollectors);
return IntStream.range(0, rowType.getFieldCount())
.mapToObj(
i -> {
DataField field = rowType.getFields().get(i);
return toFieldStats(
field, columnStats.get(field.name()), collectors[i]);
})
.toArray(SimpleColStats[]::new);
}
return extract(fileIO, path, length);
}

@Override
public Pair<SimpleColStats[], FileInfo> extractWithFileInfo(
FileIO fileIO, Path path, long length) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,38 @@ public class ParquetUtil {
throws IOException {
try (ParquetFileReader reader = getParquetReader(fileIO, path, length, options)) {
ParquetMetadata parquetMetadata = reader.getFooter();
List<BlockMetaData> blockMetaDataList = parquetMetadata.getBlocks();
Map<String, Statistics<?>> resultStats = new HashMap<>();
for (BlockMetaData blockMetaData : blockMetaDataList) {
List<ColumnChunkMetaData> columnChunkMetaDataList = blockMetaData.getColumns();
for (ColumnChunkMetaData columnChunkMetaData : columnChunkMetaDataList) {
Statistics<?> stats = columnChunkMetaData.getStatistics();
String columnName = columnChunkMetaData.getPath().toDotString();
Statistics<?> midStats;
if (!resultStats.containsKey(columnName)) {
midStats = stats;
} else {
midStats = resultStats.get(columnName);
midStats.mergeStatistics(stats);
}
resultStats.put(columnName, midStats);
Map<String, Statistics<?>> resultStats = extractColumnStats(parquetMetadata);
return Pair.of(resultStats, new SimpleStatsExtractor.FileInfo(reader.getRecordCount()));
}
}

/**
* Extract column stats from in-memory {@link ParquetMetadata}. This avoids re-reading the file
* from storage, which is critical for object stores (like OSS/S3) where the file may not be
* immediately visible after close.
*
* @param parquetMetadata the in-memory Parquet metadata (footer)
* @return result sets as map, key is column name, value is statistics
*/
public static Map<String, Statistics<?>> extractColumnStats(ParquetMetadata parquetMetadata) {
List<BlockMetaData> blockMetaDataList = parquetMetadata.getBlocks();
Map<String, Statistics<?>> resultStats = new HashMap<>();
for (BlockMetaData blockMetaData : blockMetaDataList) {
List<ColumnChunkMetaData> columnChunkMetaDataList = blockMetaData.getColumns();
for (ColumnChunkMetaData columnChunkMetaData : columnChunkMetaDataList) {
Statistics<?> stats = columnChunkMetaData.getStatistics();
String columnName = columnChunkMetaData.getPath().toDotString();
Statistics<?> midStats;
if (!resultStats.containsKey(columnName)) {
midStats = stats;
} else {
midStats = resultStats.get(columnName);
midStats.mergeStatistics(stats);
}
resultStats.put(columnName, midStats);
}
return Pair.of(resultStats, new SimpleStatsExtractor.FileInfo(reader.getRecordCount()));
}
return resultStats;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.paimon.format.FormatWriter;

import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import javax.annotation.Nullable;

import java.io.IOException;

Expand All @@ -33,6 +36,9 @@ public class ParquetBulkWriter implements FormatWriter {
/** The ParquetWriter to write to. */
private final ParquetWriter<InternalRow> parquetWriter;

/** Cached footer metadata after close, used to avoid re-reading the file for stats. */
@Nullable private ParquetMetadata footerMetadata;

/**
* Creates a new ParquetBulkWriter wrapping the given ParquetWriter.
*
Expand All @@ -50,10 +56,17 @@ public void addElement(InternalRow datum) throws IOException {
@Override
public void close() throws IOException {
parquetWriter.close();
this.footerMetadata = parquetWriter.getFooter();
}

@Override
public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws IOException {
return suggestedCheck && parquetWriter.getDataSize() >= targetSize;
}

@Nullable
@Override
public Object writerMetadata() {
return footerMetadata;
}
}
Loading
Loading