From d90e307b6017e201443581d0bdf61892f7b4f906 Mon Sep 17 00:00:00 2001 From: Wolfgang De Salvador Date: Tue, 14 Apr 2026 12:44:28 +0200 Subject: [PATCH] Enhance parquet reader with MMAP and with metadata_cache --- dlio_benchmark/reader/parquet_reader.py | 54 ++++++++++++++++++++----- 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/dlio_benchmark/reader/parquet_reader.py b/dlio_benchmark/reader/parquet_reader.py index 6397e932..ed3be229 100644 --- a/dlio_benchmark/reader/parquet_reader.py +++ b/dlio_benchmark/reader/parquet_reader.py @@ -15,6 +15,8 @@ Configuration (under storage_options in the DLIO YAML): columns: null # list of column names to read (null = all) row_group_cache_size: 4 # max row groups held in memory per reader thread + metadata_cache: true # cache parquet footer metadata across opens + memory_map: true # use memory-mapped I/O Example YAML snippet: dataset: @@ -24,6 +26,8 @@ storage_options: columns: ["feature1", "label"] row_group_cache_size: 8 + metadata_cache: true + memory_map: true """ import bisect @@ -57,6 +61,14 @@ def __init__(self, dataset_type, thread_index, epoch): opts = getattr(self._args, "storage_options", {}) or {} + # Configuration flags + self._use_metadata_cache = opts.get("metadata_cache", True) + self._use_memory_map = opts.get("memory_map", True) + + # Metadata cache: filename -> (FileMetaData, cumulative_offsets) + # Caches parquet footer metadata to avoid re-reading it on every open + self._metadata_cache: dict = {} + # Optional column selection (list[str] or None = all columns) self._columns = opts.get("columns") or None @@ -67,7 +79,8 @@ def __init__(self, dataset_type, thread_index, epoch): self.logger.info( f"{utcnow()} ParquetReader thread={thread_index} epoch={epoch} " - f"columns={self._columns} rg_cache_size={self._rg_cache_size}" + f"columns={self._columns} rg_cache_size={self._rg_cache_size} " + f"metadata_cache={self._use_metadata_cache} memory_map={self._use_memory_map}" ) # ── Helpers ────────────────────────────────────────────────────────────── @@ -88,21 +101,42 @@ def open(self, filename): Returns (ParquetFile, cumulative_offsets) stored in open_file_map[filename]. cumulative_offsets[i] is the first row index of row group i; cumulative_offsets[-1] is the total row count. + + With metadata_cache=True, caches parquet metadata (footer) to avoid re-reading. + With memory_map=True, uses memory-mapped I/O for faster access. """ import pyarrow.parquet as pq - pf = pq.ParquetFile(filename) - meta = pf.metadata + cached_meta = None + cached_offsets = None - # Build cumulative row offsets [0, rg0_rows, rg0+rg1_rows, ...] - offsets = [0] - for i in range(meta.num_row_groups): - offsets.append(offsets[-1] + meta.row_group(i).num_rows) + # Check if metadata is cached + if self._use_metadata_cache: + cached = self._metadata_cache.get(filename) + if cached is not None: + cached_meta, cached_offsets = cached - self.logger.debug( - f"{utcnow()} ParquetReader.open {filename} " - f"row_groups={meta.num_row_groups} total_rows={offsets[-1]}" + # Open the file - pass cached metadata to skip footer read if available + pf = pq.ParquetFile( + filename, + memory_map=self._use_memory_map, + metadata=cached_meta ) + + # Use cached offsets or compute them + if cached_offsets is not None: + offsets = cached_offsets + else: + # Build cumulative row offsets [0, rg0_rows, rg0+rg1_rows, ...] + meta = pf.metadata + offsets = [0] + for i in range(meta.num_row_groups): + offsets.append(offsets[-1] + meta.row_group(i).num_rows) + + # Cache the metadata and offsets + if self._use_metadata_cache: + self._metadata_cache[filename] = (meta, offsets) + return (pf, offsets) @dlp.log