Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 44 additions & 10 deletions dlio_benchmark/reader/parquet_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
Configuration (under storage_options in the DLIO YAML):
columns: null # list of column names to read (null = all)
row_group_cache_size: 4 # max row groups held in memory per reader thread
metadata_cache: true # cache parquet footer metadata across opens
memory_map: true # use memory-mapped I/O

Example YAML snippet:
dataset:
Expand All @@ -24,6 +26,8 @@
storage_options:
columns: ["feature1", "label"]
row_group_cache_size: 8
metadata_cache: true
memory_map: true
"""
import bisect

Expand Down Expand Up @@ -57,6 +61,14 @@ def __init__(self, dataset_type, thread_index, epoch):

opts = getattr(self._args, "storage_options", {}) or {}

# Configuration flags
self._use_metadata_cache = opts.get("metadata_cache", True)
self._use_memory_map = opts.get("memory_map", True)

# Metadata cache: filename -> (FileMetaData, cumulative_offsets)
# Caches parquet footer metadata to avoid re-reading it on every open
self._metadata_cache: dict = {}

# Optional column selection (list[str] or None = all columns)
self._columns = opts.get("columns") or None

Expand All @@ -67,7 +79,8 @@ def __init__(self, dataset_type, thread_index, epoch):

self.logger.info(
f"{utcnow()} ParquetReader thread={thread_index} epoch={epoch} "
f"columns={self._columns} rg_cache_size={self._rg_cache_size}"
f"columns={self._columns} rg_cache_size={self._rg_cache_size} "
f"metadata_cache={self._use_metadata_cache} memory_map={self._use_memory_map}"
)

# ── Helpers ──────────────────────────────────────────────────────────────
Expand All @@ -88,21 +101,42 @@ def open(self, filename):
Returns (ParquetFile, cumulative_offsets) stored in open_file_map[filename].
cumulative_offsets[i] is the first row index of row group i;
cumulative_offsets[-1] is the total row count.

With metadata_cache=True, caches parquet metadata (footer) to avoid re-reading.
With memory_map=True, uses memory-mapped I/O for faster access.
"""
import pyarrow.parquet as pq

pf = pq.ParquetFile(filename)
meta = pf.metadata
cached_meta = None
cached_offsets = None

# Build cumulative row offsets [0, rg0_rows, rg0+rg1_rows, ...]
offsets = [0]
for i in range(meta.num_row_groups):
offsets.append(offsets[-1] + meta.row_group(i).num_rows)
# Check if metadata is cached
if self._use_metadata_cache:
cached = self._metadata_cache.get(filename)
if cached is not None:
cached_meta, cached_offsets = cached

self.logger.debug(
f"{utcnow()} ParquetReader.open {filename} "
f"row_groups={meta.num_row_groups} total_rows={offsets[-1]}"
# Open the file - pass cached metadata to skip footer read if available
pf = pq.ParquetFile(
filename,
memory_map=self._use_memory_map,
metadata=cached_meta
)

# Use cached offsets or compute them
if cached_offsets is not None:
offsets = cached_offsets
else:
# Build cumulative row offsets [0, rg0_rows, rg0+rg1_rows, ...]
meta = pf.metadata
offsets = [0]
for i in range(meta.num_row_groups):
offsets.append(offsets[-1] + meta.row_group(i).num_rows)

# Cache the metadata and offsets
if self._use_metadata_cache:
self._metadata_cache[filename] = (meta, offsets)

return (pf, offsets)

@dlp.log
Expand Down
Loading