Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 291 additions & 0 deletions dlio_benchmark/data_generator/arrow_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,291 @@
"""
Copyright (c) 2025, UChicago Argonne, LLC
All Rights Reserved

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os

import numpy as np
import pyarrow as pa

from dlio_benchmark.data_generator.data_generator import DataGenerator
from dlio_benchmark.utils.utility import progress, gen_random_tensor

# All numeric dtypes supported for column generation.
_NP_TYPE_MAP = {
'uint8': np.uint8,
'uint16': np.uint16,
'uint32': np.uint32,
'uint64': np.uint64,
'int8': np.int8,
'int16': np.int16,
'int32': np.int32,
'int64': np.int64,
'float16': np.float16,
'float32': np.float32,
'float64': np.float64,
}

_PA_SCALAR_TYPE_MAP = {
'uint8': pa.uint8(),
'uint16': pa.uint16(),
'uint32': pa.uint32(),
'uint64': pa.uint64(),
'int8': pa.int8(),
'int16': pa.int16(),
'int32': pa.int32(),
'int64': pa.int64(),
'float16': pa.float16(),
'float32': pa.float32(),
'float64': pa.float64(),
}


class ArrowGenerator(DataGenerator):
"""
Schema-driven Arrow IPC data generator.

Supports two modes:

1. **Column-schema mode** (``arrow_columns`` config list is non-empty):
Generates multi-column files from a list of column specs, each with a
``name``, ``dtype``, and optional ``size`` (embedding vector length).
Supported dtypes: uint8/16/32/64, int8/16/32/64, float16/32/64,
string, binary, bool.

2. **Legacy mode** (``arrow_columns`` empty):
Single ``data`` column of fixed-size uint8 lists, matching the original
DLIO behaviour for backward compatibility.

Each file is written via ``pa.ipc.RecordBatchFileWriter`` so that readings
can use ``pa.ipc.open_file(pa.memory_map(...))`` for true zero-copy,
memory-mapped access — the OS page cache handles I/O without any explicit
read() syscalls.

Key design properties:
- **Unique samples**: every row in every batch has distinct data.
- **RNG flow-through**: a single ``np.random.Generator`` is initialised
once per rank and advanced naturally through all file and batch
generations. No seed resets occur between files.
- **Near-zero copy**: numeric columns use ``gen_random_tensor`` with
``rng=rng``; once the raw bytes exist they are wrapped in a
``FixedSizeListArray`` via ``pa.array()`` using contiguous buffers.
"""

def __init__(self):
super().__init__()
self.arrow_columns = getattr(self._args, 'arrow_columns', [])
batch = getattr(self._args, 'arrow_generation_batch_size', 0)
self.generation_batch_size = batch if batch > 0 else 1024

# ── Schema ───────────────────────────────────────────────────────────────

def _build_schema(self, legacy_elem_size=None):
"""Build PyArrow schema from configured columns.

When called in legacy mode (``arrow_columns`` is empty or None),
``legacy_elem_size`` must be provided; it is the number of uint8
elements per sample (= dim1 * dim2). The schema uses a
``pa.list_(pa.uint8(), legacy_elem_size)`` fixed-size list, which
lets PyArrow use the efficient ``FixedSizeListArray`` representation
on reads.

When called in column-schema mode, ``legacy_elem_size`` is ignored.
"""
if not self.arrow_columns:
size = legacy_elem_size or 1
return pa.schema([('data', pa.list_(pa.uint8(), size))])

fields = []
for col_spec in self.arrow_columns:
if hasattr(col_spec, 'get'):
name = str(col_spec.get('name', 'data'))
dtype = str(col_spec.get('dtype', 'float32'))
size = int(col_spec.get('size', 1))
else:
name, dtype, size = str(col_spec), 'float32', 1

pa_scalar = _PA_SCALAR_TYPE_MAP.get(dtype)

if pa_scalar is not None:
if size == 1:
fields.append(pa.field(name, pa_scalar))
else:
# Fixed-size list of the scalar type
fields.append(pa.field(name, pa.list_(pa_scalar, size)))
elif dtype == 'list':
fields.append(pa.field(name, pa.list_(pa.float32(), size)))
elif dtype == 'string':
fields.append(pa.field(name, pa.string()))
elif dtype == 'binary':
fields.append(pa.field(name, pa.binary()))
elif dtype == 'bool':
fields.append(pa.field(name, pa.bool_()))
else:
# Unknown dtype — fall back to fixed-size float32 list
fields.append(pa.field(name, pa.list_(pa.float32(), size)))

return pa.schema(fields)

# ── Batch generation helpers ──────────────────────────────────────────────

def _generate_column_data_batch(self, col_spec, batch_size, rng):
"""Generate one batch of data for a single column.

All numeric dtypes use ``gen_random_tensor(rng=rng)`` so the RNG
state advances naturally — no seed is computed or reset between calls.

Returns ``(name, pa.Array)``.
"""
if hasattr(col_spec, 'get'):
name = str(col_spec.get('name', 'data'))
dtype = str(col_spec.get('dtype', 'float32'))
size = int(col_spec.get('size', 1))
else:
name, dtype, size = str(col_spec), 'float32', 1

np_type = _NP_TYPE_MAP.get(dtype)
pa_scalar = _PA_SCALAR_TYPE_MAP.get(dtype)

# ── Numeric scalar (size == 1) ──────────────────────────────────────
if np_type is not None and pa_scalar is not None and size == 1:
data = gen_random_tensor(shape=(batch_size,), dtype=np_type, rng=rng)
return name, pa.array(data, type=pa_scalar)

# ── Numeric fixed-size list (size > 1) ─────────────────────────────
if np_type is not None and pa_scalar is not None:
# Generate as a flat (batch_size * size) array, then wrap as
# FixedSizeListArray — zero extra copies after dgen/numpy.
data = gen_random_tensor(shape=(batch_size * size,), dtype=np_type, rng=rng)
arrow_flat = pa.array(data, type=pa_scalar)
return name, pa.FixedSizeListArray.from_arrays(arrow_flat, size)

if dtype == 'list':
data = gen_random_tensor(shape=(batch_size * size,), dtype=np.float32, rng=rng)
arrow_flat = pa.array(data, type=pa.float32())
return name, pa.FixedSizeListArray.from_arrays(arrow_flat, size)

# ── Non-numeric types — use numpy global state (seeded per rank) ───
if dtype == 'string':
# Use integers from rng to build strings so they vary per run seed
ints = rng.integers(0, 2**31, size=batch_size)
return name, pa.array([f"s_{v}" for v in ints], type=pa.string())

if dtype == 'binary':
# Each sample: size random bytes from rng
rows = [rng.bytes(size) for _ in range(batch_size)]
return name, pa.array(rows, type=pa.binary())

if dtype == 'bool':
bits = rng.integers(0, 2, size=batch_size, dtype=np.uint8)
return name, pa.array(bits.astype(bool), type=pa.bool_())

# Fallback: float32 fixed-size list
data = gen_random_tensor(shape=(batch_size * size,), dtype=np.float32, rng=rng)
arrow_flat = pa.array(data, type=pa.float32())
return name, pa.FixedSizeListArray.from_arrays(arrow_flat, size)

def _generate_batch_columns(self, batch_size, rng):
"""Generate all configured columns for one batch.

The same ``rng`` object is advanced per column so every column in
every batch gets statistically independent, non-repeating data.
"""
columns = {}
for col_spec in self.arrow_columns:
name, arrow_data = self._generate_column_data_batch(col_spec, batch_size, rng)
columns[name] = arrow_data
return columns

def _generate_legacy_batch(self, elem_size, batch_size, rng):
"""Generate one batch for the legacy single-'data'-column mode.

Generates ``(batch_size * elem_size)`` bytes in one dgen/numpy call,
then wraps the result as a ``FixedSizeListArray`` — no per-row Python
loop, no tiling, no copy. Each row is a distinct slice of the data
stream so samples within the same file are NOT identical.

``elem_size`` = dim1 * dim2 (the flat element count per sample).
"""
flat = gen_random_tensor(shape=(batch_size * elem_size,), dtype=np.uint8, rng=rng)
arrow_flat = pa.array(flat, type=pa.uint8())
arrow_data = pa.FixedSizeListArray.from_arrays(arrow_flat, elem_size)
return {'data': arrow_data}

# ── Main generation loop ──────────────────────────────────────────────────

def generate(self):
"""Generate Arrow IPC files using batched, RNG-flow-through generation.

Seeding:
- One ``np.random.Generator`` is created per MPI rank, seeded with
``BASE_SEED + my_rank``, and advanced through ALL file and batch
generations without any intermediate resets.
- This guarantees: (a) cross-file uniqueness — each file starts from a
different RNG state; (b) within-file uniqueness — each batch and each
sample row continues from where the previous one left off; (c)
reproducibility — the same master seed always produces the same files.
"""
super().generate()

# Single RNG for the entire rank — never reset between files.
np.random.seed(self.BASE_SEED + self.my_rank)
rng = np.random.default_rng(seed=self.BASE_SEED + self.my_rank)

dim = self.get_dimension(self.total_files_to_generate)
is_local = self.storage.islocalfs()

for i in range(self.my_rank, int(self.total_files_to_generate), self.comm_size):
progress(i + 1, self.total_files_to_generate, "Generating Arrow Data")

out_path_spec = self.storage.get_uri(self._file_list[i])

# Compute element size for legacy mode (dim may be list or scalar).
dim_raw = dim[2 * i]
if isinstance(dim_raw, list):
dim1 = int(dim_raw[0])
dim2 = int(dim_raw[1]) if len(dim_raw) > 1 else 1
else:
dim1 = int(dim_raw)
dim2 = int(dim[2 * i + 1])
elem_size = dim1 * dim2

# Build schema.
schema = self._build_schema(legacy_elem_size=elem_size)

# Generate all column data upfront for the entire file.
if self.arrow_columns:
full_columns = self._generate_batch_columns(self.num_samples, rng)
else:
full_columns = self._generate_legacy_batch(elem_size, self.num_samples, rng)

table = pa.table(full_columns, schema=schema)

if is_local:
parent_dir = os.path.dirname(out_path_spec)
if parent_dir:
os.makedirs(parent_dir, exist_ok=True)
# Write as Arrow IPC file (random-access format, memory-mappable)
with pa.OSFile(out_path_spec, 'wb') as sink:
writer = pa.ipc.new_file(sink, schema)
writer.write_table(table)
writer.close()
else:
sink = pa.BufferOutputStream()
writer = pa.ipc.new_file(sink, schema)
writer.write_table(table)
writer.close()
self.storage.put_data(out_path_spec, sink.getvalue().to_pybytes())

np.random.seed()
3 changes: 3 additions & 0 deletions dlio_benchmark/data_generator/generator_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,8 @@ def get_generator(type):
elif type == FormatType.PARQUET:
from dlio_benchmark.data_generator.parquet_generator import ParquetGenerator
return ParquetGenerator()
elif type == FormatType.ARROW:
from dlio_benchmark.data_generator.arrow_generator import ArrowGenerator
return ArrowGenerator()
else:
raise Exception(str(ErrorCodes.EC1001))
96 changes: 96 additions & 0 deletions dlio_benchmark/reader/arrow_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
Arrow IPC reader using memory-mapped files for zero-copy access.

Opens Arrow IPC files via ``pa.memory_map()`` + ``pa.ipc.open_file()`` so the
OS page cache handles all I/O — no explicit read() syscalls, no data copies
into Python heap.
"""
import bisect

from dlio_benchmark.common.constants import MODULE_DATA_READER
from dlio_benchmark.reader.reader_handler import FormatReader
from dlio_benchmark.utils.utility import Profile, utcnow

dlp = Profile(MODULE_DATA_READER)


class ArrowReader(FormatReader):
"""
Memory-mapped Arrow IPC reader.

Uses ``pa.memory_map`` for true zero-copy reads — the kernel page cache
serves data directly into the process address space.
"""
@dlp.log_init
def __init__(self, dataset_type, thread_index, epoch):
super().__init__(dataset_type, thread_index)

self.logger.info(
f"{utcnow()} ArrowReader thread={thread_index} epoch={epoch}"
)

@dlp.log
def open(self, filename):
"""Memory-map an Arrow IPC file and build cumulative row offsets."""
import pyarrow as pa

mmap_file = pa.memory_map(filename, 'r')
reader = pa.ipc.open_file(mmap_file)

# Build cumulative row offsets from record batches
offsets = [0]
for i in range(reader.num_record_batches):
offsets.append(offsets[-1] + reader.get_batch(i).num_rows)

return (mmap_file, reader, offsets)

@dlp.log
def close(self, filename):
"""Close the memory-mapped file."""
if filename in self.open_file_map:
entry = self.open_file_map[filename]
if entry is not None:
entry[0].close()
super().close(filename)

@dlp.log
def get_sample(self, filename, sample_index):
"""Read the record batch containing sample_index via zero-copy memory-mapped access."""
mmap_file, reader, offsets = self.open_file_map[filename]

# Binary search for the batch containing this sample
batch_idx = max(0, bisect.bisect_right(offsets, sample_index) - 1)
batch_idx = min(batch_idx, reader.num_record_batches - 1)

batch = reader.get_batch(batch_idx)

# Touch every page to trigger mmap page faults and ensure full I/O
PAGE_SIZE = 4096
for col in batch.columns:
for buf in col.buffers():
if buf is not None and buf.size > 0:
mv = memoryview(buf)
# Touch one byte per page to fault in entire buffer
for offset in range(0, len(mv), PAGE_SIZE):
_ = mv[offset]

dlp.update(image_size=batch.nbytes)

def next(self):
for batch in super().next():
yield batch

@dlp.log
def read_index(self, image_idx, step):
dlp.update(step=step)
return super().read_index(image_idx, step)

@dlp.log
def finalize(self):
return super().finalize()

def is_index_based(self):
return True

def is_iterator_based(self):
return True
Loading
Loading