Skip to content

Support TsfileDataFrame#765

Open
ycycse wants to merge 1 commit intoapache:developfrom
ycycse:tsdf
Open

Support TsfileDataFrame#765
ycycse wants to merge 1 commit intoapache:developfrom
ycycse:tsdf

Conversation

@ycycse
Copy link
Copy Markdown
Member

@ycycse ycycse commented Apr 2, 2026

This PR introduce TsFileDataFrame, which can read multi tsfile for model training usage. Just like use dataframe.

https://apache-iotdb-project.feishu.cn/docx/SenJdxlbuoUS5Uxmq7jcOUzdnob?from=from_copylink

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a new Python-side “dataframe-like” API for reading TsFile table-model data across multiple TsFile shards, enabling series discovery, per-series access, and time-aligned multi-series queries for model training workflows.

Changes:

  • Added TsFileSeriesReader to discover series (table + tags + field) and read data via Arrow batch queries.
  • Added TsFileDataFrame (plus Timeseries / AlignedTimeseries) to unify multiple TsFiles, merge overlapping series, and support .loc aligned queries.
  • Exported the new classes from python/tsfile/__init__.py.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.

File Description
python/tsfile/tsfile_series_reader.py New Arrow-based series reader with metadata discovery, timestamp caching, and range/time reads.
python/tsfile/tsfile_dataframe.py New multi-file dataframe-like abstraction with series selection, merged metadata, and .loc alignment.
python/tsfile/init.py Re-export TsFileDataFrame, Timeseries, and AlignedTimeseries from the package.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +402 to +407
if arrow_table.num_rows > 0:
ts_list.append(arrow_table.column('time').to_numpy())
for fc in field_columns:
field_lists[fc].append(
arrow_table.column(fc).to_numpy().astype(np.float64)
)
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In _read_arrow, values are always converted via to_numpy().astype(np.float64). This will fail for FIELD columns that are not numeric (e.g., STRING/TEXT/BLOB/DATE) and can also fail when the Arrow array contains nulls (writing code allows NaN values, which become nulls on read). Consider filtering field_columns up-front to numeric TSDataType only, and/or using Arrow casting + null-to-NaN handling before converting to NumPy (and keeping non-numeric columns out of this reader).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines +280 to +285
if series_path in self._series_data_cache:
return self._series_data_cache[series_path][start:end].tolist()

info = self.series_info[series_path]
timestamps = self._timestamps_cache[series_path]

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_series_range() indexes timestamps[start] and timestamps[end - 1] without validating start/end (including start == end, negative indices, or end > length). This currently raises confusing IndexErrors and makes empty slices impossible. Add explicit range validation and return an empty result for start >= end to match the docstring’s [start, end) semantics.

Suggested change
if series_path in self._series_data_cache:
return self._series_data_cache[series_path][start:end].tolist()
info = self.series_info[series_path]
timestamps = self._timestamps_cache[series_path]
if start < 0 or end < 0:
raise ValueError("start and end indices must be non-negative")
# Handle cached series data first, using [start, end) semantics
if series_path in self._series_data_cache:
data = self._series_data_cache[series_path]
length = len(data)
if start >= length:
return []
if end > length:
end = length
if start >= end:
return []
return data[start:end].tolist()
info = self.series_info[series_path]
timestamps = self._timestamps_cache[series_path]
length = len(timestamps)
if start >= length:
return []
if end > length:
end = length
if start >= end:
return []

Copilot uses AI. Check for mistakes.
Comment on lines +171 to +183
# Multiple tag columns: use structured approach
# Convert to list of tuples for grouping
n = len(all_times)
tag_tuples = [
tuple(all_tags[tc][i] for tc in tag_columns)
for i in range(n)
]
unique_tuples = list(dict.fromkeys(tag_tuples))
for ut in unique_tuples:
mask = np.array([t == ut for t in tag_tuples], dtype=bool)
self._register_tag_group(
table_name, tag_columns, ut,
field_columns, all_times[mask]
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The multi-tag grouping path builds tag_tuples in Python and then, for each unique tuple, constructs a full boolean mask with a Python loop ([t == ut for t in tag_tuples]). This is O(n * unique_tags) and will become a bottleneck for large tables. Consider using a vectorized approach (e.g., structured NumPy array + np.unique(..., return_inverse=True) or pandas/groupby) to compute groups and indices in (near) linear time.

Suggested change
# Multiple tag columns: use structured approach
# Convert to list of tuples for grouping
n = len(all_times)
tag_tuples = [
tuple(all_tags[tc][i] for tc in tag_columns)
for i in range(n)
]
unique_tuples = list(dict.fromkeys(tag_tuples))
for ut in unique_tuples:
mask = np.array([t == ut for t in tag_tuples], dtype=bool)
self._register_tag_group(
table_name, tag_columns, ut,
field_columns, all_times[mask]
# Multiple tag columns: use structured NumPy array for grouping
n = len(all_times)
# Build a structured array with one field per tag column
dtype = [(tc, all_tags[tc].dtype) for tc in tag_columns]
structured_tags = np.empty(n, dtype=dtype)
for tc in tag_columns:
structured_tags[tc] = all_tags[tc]
# Find unique tag combinations and an inverse index for grouping
unique_vals, inverse = np.unique(
structured_tags, return_inverse=True
)
# Group rows by unique tag combination using the inverse index
for group_id, ut in enumerate(unique_vals):
mask = inverse == group_id
tag_values = tuple(ut[tc] for tc in tag_columns)
self._register_tag_group(
table_name,
tag_columns,
tag_values,
field_columns,
all_times[mask],

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For tables with multiple tag columns, grouping is done by:

  1. Building a Python list of tuples (line 174-177): O(n)
  2. For each unique tuple, scanning the entire list to build a boolean mask (line 180): O(n * k) where k = number of unique groups

For a table with 1M rows and 1000 tag groups, this becomes ~1 billion comparisons in pure Python.

Comment on lines +399 to +406
class TsFileDataFrame:
"""
Lazy-loaded unified view over multiple TsFile files.

Each dimension is a time series identified by Table + Tags + Field.
Supports cross-file merging of same-named series.
"""

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR adds a sizable new public API surface (TsFileDataFrame, .loc, cross-file merge semantics, and TsFileSeriesReader-based reading), but there are no accompanying unit tests. The repo already has pytest-based coverage for readers/writers and Arrow batch queries; please add tests that cover: series discovery across multiple files, Timeseries row slicing, .loc alignment for multiple series, and behavior with missing/null values.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +266 to +288
def read_series_range(self, series_path: str, start: int, end: int) -> List[float]:
"""Read specified range of time series by row index.

Args:
series_path: Time series path.
start: Start index (inclusive).
end: End index (exclusive).

Returns:
List of data points.
"""
if series_path not in self.series_info:
raise ValueError(f"Series not found: {series_path}")

if series_path in self._series_data_cache:
return self._series_data_cache[series_path][start:end].tolist()

info = self.series_info[series_path]
timestamps = self._timestamps_cache[series_path]

start_time = int(timestamps[start])
end_time = int(timestamps[end - 1])

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_series_range() assumes start < end and will raise or return incorrect results when start == end (it computes end_time = timestamps[end - 1]). Consider explicitly handling empty ranges (return []) and validating bounds early with a clear error (e.g., start < 0, end > len, start > end).

Copilot uses AI. Check for mistakes.
Comment on lines +55 to +58
try:
self._reader = TsFileReaderPy(file_path)
except Exception as e:
raise ValueError(f"Failed to open TsFile: {e}")
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When rethrowing the exception from TsFileReaderPy(file_path), the original traceback is lost. Prefer exception chaining (raise ValueError(...) from e) so callers can debug the underlying failure more easily.

Copilot uses AI. Check for mistakes.
Comment on lines +49 to +57
Returned by .loc[...] and df[slice/list]. Supports:
- result.timestamps -> np.ndarray of ms timestamps
- result.values -> np.ndarray of shape (rows, cols)
- result.series_names -> list of series name strings
- result[i] / result[i, j] -> index into values
- print(result) -> truncated table (20 rows)
- result.show() -> full table (no truncation)
- result.show(50) -> show up to 50 rows
"""
Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring says AlignedTimeseries is "Returned by ... df[slice/list]", but TsFileDataFrame.__getitem__ for slice/list returns a subset TsFileDataFrame, not an AlignedTimeseries. Please update the docstring to match actual behavior (or implement the documented behavior).

Copilot uses AI. Check for mistakes.
Comment on lines +399 to +476
class TsFileDataFrame:
"""
Lazy-loaded unified view over multiple TsFile files.

Each dimension is a time series identified by Table + Tags + Field.
Supports cross-file merging of same-named series.
"""

def __init__(self, paths: Union[str, List[str]], show_progress: bool = True):
if isinstance(paths, str):
paths = [paths]

# Expand directories: collect all .tsfile files within each directory
expanded = []
for p in paths:
if os.path.isdir(p):
tsfiles = sorted(
os.path.join(root, f)
for root, _, files in os.walk(p)
for f in files
if f.endswith('.tsfile')
)
if not tsfiles:
raise FileNotFoundError(
f"No .tsfile files found in directory: {p}"
)
expanded.extend(tsfiles)
else:
expanded.append(p)

self._paths = []
for p in expanded:
if not os.path.exists(p):
raise FileNotFoundError(f"TsFile not found: {p}")
self._paths.append(os.path.abspath(p))

self._show_progress = show_progress
self._readers = {}
self._series_list: List[str] = []
self._series_map: Dict[str, list] = {} # series_path -> [(reader, info), ...]
self._merged_timestamps: Dict[str, np.ndarray] = {}
self._merged_info: Dict[str, dict] = {}
self._name_to_index: Dict[str, int] = {}

self._is_view = False
self._root = None

self._load_metadata()

@classmethod
def _from_subset(cls, parent: 'TsFileDataFrame', series_names: List[str]) -> 'TsFileDataFrame':
"""Create a lightweight view over a subset of series.

Shares readers, series_map, merged_timestamps, and merged_info
with the root. Does NOT own any readers and will not close them.
"""
obj = object.__new__(cls)
obj._root = parent._root if parent._is_view else parent
obj._is_view = True
obj._paths = parent._paths
obj._readers = parent._readers
obj._series_map = parent._series_map
obj._merged_timestamps = parent._merged_timestamps
obj._merged_info = parent._merged_info
obj._show_progress = parent._show_progress
obj._series_list = list(series_names)
obj._name_to_index = {name: i for i, name in enumerate(series_names)}
return obj

def _load_metadata(self):
"""Load metadata from all TsFile files."""
from .tsfile_series_reader import TsFileSeriesReader

if len(self._paths) >= 2:
self._load_metadata_parallel(TsFileSeriesReader)
else:
self._load_metadata_serial(TsFileSeriesReader)

Copy link

Copilot AI Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TsFileDataFrame, Timeseries, and .loc[...] introduce substantial new public API and non-trivial merging/alignment logic, but there are currently no pytest tests covering them (no TsFileDataFrame references under python/tests). Please add focused tests for: metadata loading from multiple shards, __getitem__ by index/name/slice, Timeseries row slicing, and .loc time-range alignment across multiple series/files.

Copilot uses AI. Check for mistakes.
Comment on lines +128 to +139
def show(self, max_rows: Optional[int] = None):
"""Print formatted table with configurable row limit.

Args:
max_rows: Maximum rows to display. None for all rows.
"""
n_rows, n_cols = self.values.shape
if n_rows == 0:
print(f"AlignedTimeseries(0 rows, {n_cols} series)")
return
ts_strs, ts_width, col_widths, val_strs = self._build_display()
print(self._format_rows(ts_strs, ts_width, col_widths, val_strs, max_rows))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass max_rows to _build_display?

n_rows = len(ts_strs)
n_cols = len(col_widths)

header_parts = ['timestamp'.rjust(ts_width)]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column is a datetime string; it is improper to call it timestamp.
May use time or datetime.

col_widths = []
val_strs = []
for col_idx in range(n_cols):
col_name = self.series_names[col_idx] if col_idx < len(self.series_names) else f'col_{col_idx}'
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what case will col_xxx be used?

Comment on lines +74 to +83
def _build_display(self):
"""Pre-compute string representations for display."""
n_rows, n_cols = self.values.shape
ts_strs = [_format_timestamp(int(t)) for t in self.timestamps]
ts_width = max((len(s) for s in ts_strs), default=0)
ts_width = max(ts_width, len('timestamp'))

col_widths = []
val_strs = []
for col_idx in range(n_cols):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to concat timestamps and values, then use ndarry's print method if any?

Comment on lines +260 to +262
# Deduplicate by timestamp (keep first occurrence)
_, unique_idx = np.unique(merged_ts, return_index=True)
return merged_ts[unique_idx], merged_vals[unique_idx]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why keep the first occurrence?

Comment on lines +94 to +105
table_schemas = self._reader.get_all_table_schemas()
if not table_schemas:
raise ValueError("No tables found in TsFile")

self.series_paths = []
table_names = list(table_schemas.keys())

# Progress tracking
total_rows = 0

for ti, table_name in enumerate(table_names):
table_schema = self._reader.get_table_schema(table_name)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use table_schemas?

Comment on lines +112 to +113
if col_name.lower() == 'time':
continue
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also check column category?

Comment on lines +122 to +130
# Query TAG columns + first FIELD column to discover groups and timestamps
query_cols = tag_columns + [field_columns[0]]

time_arrays = []
tag_arrays = {tc: [] for tc in tag_columns}

with self._reader.query_table_batch(
table_name, query_cols, batch_size=65536
) as rs:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A TsFile may not actually contain a column's data even if it is in the schema.
Double-check if timestamps can be read in this scenario.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Storing tags repeatedly is a great challenge to memory.
May consider getting all DeviceIds first, and conclude tag values from them.

Comment on lines +402 to +407
if arrow_table.num_rows > 0:
ts_list.append(arrow_table.column('time').to_numpy())
for fc in field_columns:
field_lists[fc].append(
arrow_table.column(fc).to_numpy().astype(np.float64)
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines +419 to +429
def cache_series_data(self, series_path: str):
"""Pre-load series data into memory cache.

Args:
series_path: Time series path.
"""
if series_path not in self.series_info:
raise ValueError(f"Series not found: {series_path}")
if series_path not in self._series_data_cache:
data = self.read_series(series_path)
self._series_data_cache[series_path] = np.array(data, dtype=np.float32)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is not used? And there does not seem to be any memory control.

raise ValueError(f"Series not found: {series_path}")
if series_path not in self._series_data_cache:
data = self.read_series(series_path)
self._series_data_cache[series_path] = np.array(data, dtype=np.float32)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read_series returns float64 values (via _read_arrow which casts to np.float64), but cache_series_data stores them as float32. When read_series is later called on a cached series, it returns float32 values silently, which can lose precision.

Comment on lines +171 to +183
# Multiple tag columns: use structured approach
# Convert to list of tuples for grouping
n = len(all_times)
tag_tuples = [
tuple(all_tags[tc][i] for tc in tag_columns)
for i in range(n)
]
unique_tuples = list(dict.fromkeys(tag_tuples))
for ut in unique_tuples:
mask = np.array([t == ut for t in tag_tuples], dtype=bool)
self._register_tag_group(
table_name, tag_columns, ut,
field_columns, all_times[mask]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For tables with multiple tag columns, grouping is done by:

  1. Building a Python list of tuples (line 174-177): O(n)
  2. For each unique tuple, scanning the entire list to build a boolean mask (line 180): O(n * k) where k = number of unique groups

For a table with 1M rows and 1000 tag groups, this becomes ~1 billion comparisons in pure Python.

Comment on lines +316 to +317
if idx < 0 or idx >= len(self._df._series_list):
raise IndexError(f"Series index {idx} out of range")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Negative indices are not normalized (no idx = length + idx like other __getitem__ methods in this file). A user passing df.loc[:, [-1]] would get an IndexError instead of the last series.

Comment on lines +567 to +568
merged = np.unique(np.concatenate(all_ts))
merged.sort()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

np.unique already returns a sorted array. The .sort() call is redundant.

Comment on lines +108 to +111
if max_rows is None or n_rows <= max_rows:
show_rows = list(range(n_rows))
else:
show_rows = list(range(max_rows))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When max_rows is exceeded, only the first max_rows rows are shown. In contrast, TsFileDataFrame._format_table shows head + "..." + tail. This inconsistency may confuse users expecting similar behavior from both display methods.

from .tsfile_table_writer import TsFileTableWriter
from .utils import to_dataframe, dataframe_to_tsfile No newline at end of file
from .utils import to_dataframe, dataframe_to_tsfile
from .tsfile_dataframe import TsFileDataFrame, Timeseries, AlignedTimeseries No newline at end of file
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The file ends without a newline character (\ No newline at end of file).

Comment on lines +837 to +846
def close(self):
"""Close all underlying readers.

No-op for subset views (they don't own readers).
"""
if self._is_view:
return
for reader in self._readers.values():
reader.close()
self._readers.clear()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After close(), _readers is cleared but _series_map still holds references to closed readers. Any subsequent data access (e.g., tsdf[0][0]) will attempt to read from a closed reader, producing an unclear error.

Recommendation: Either invalidate _series_map too, or set a _closed flag and check it in data-access paths.

Comment on lines +82 to +86
except Exception as e:
raise ValueError(
f"Failed to read TsFile metadata. "
f"Please ensure the TsFile is valid and readable. Error: {e}"
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original traceback is lost. Use raise ValueError(...) from e to preserve the exception chain for debugging.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants