Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces a new Python-side “dataframe-like” API for reading TsFile table-model data across multiple TsFile shards, enabling series discovery, per-series access, and time-aligned multi-series queries for model training workflows.
Changes:
- Added
TsFileSeriesReaderto discover series (table + tags + field) and read data via Arrow batch queries. - Added
TsFileDataFrame(plusTimeseries/AlignedTimeseries) to unify multiple TsFiles, merge overlapping series, and support.localigned queries. - Exported the new classes from
python/tsfile/__init__.py.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| python/tsfile/tsfile_series_reader.py | New Arrow-based series reader with metadata discovery, timestamp caching, and range/time reads. |
| python/tsfile/tsfile_dataframe.py | New multi-file dataframe-like abstraction with series selection, merged metadata, and .loc alignment. |
| python/tsfile/init.py | Re-export TsFileDataFrame, Timeseries, and AlignedTimeseries from the package. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if arrow_table.num_rows > 0: | ||
| ts_list.append(arrow_table.column('time').to_numpy()) | ||
| for fc in field_columns: | ||
| field_lists[fc].append( | ||
| arrow_table.column(fc).to_numpy().astype(np.float64) | ||
| ) |
There was a problem hiding this comment.
In _read_arrow, values are always converted via to_numpy().astype(np.float64). This will fail for FIELD columns that are not numeric (e.g., STRING/TEXT/BLOB/DATE) and can also fail when the Arrow array contains nulls (writing code allows NaN values, which become nulls on read). Consider filtering field_columns up-front to numeric TSDataType only, and/or using Arrow casting + null-to-NaN handling before converting to NumPy (and keeping non-numeric columns out of this reader).
| if series_path in self._series_data_cache: | ||
| return self._series_data_cache[series_path][start:end].tolist() | ||
|
|
||
| info = self.series_info[series_path] | ||
| timestamps = self._timestamps_cache[series_path] | ||
|
|
There was a problem hiding this comment.
read_series_range() indexes timestamps[start] and timestamps[end - 1] without validating start/end (including start == end, negative indices, or end > length). This currently raises confusing IndexErrors and makes empty slices impossible. Add explicit range validation and return an empty result for start >= end to match the docstring’s [start, end) semantics.
| if series_path in self._series_data_cache: | |
| return self._series_data_cache[series_path][start:end].tolist() | |
| info = self.series_info[series_path] | |
| timestamps = self._timestamps_cache[series_path] | |
| if start < 0 or end < 0: | |
| raise ValueError("start and end indices must be non-negative") | |
| # Handle cached series data first, using [start, end) semantics | |
| if series_path in self._series_data_cache: | |
| data = self._series_data_cache[series_path] | |
| length = len(data) | |
| if start >= length: | |
| return [] | |
| if end > length: | |
| end = length | |
| if start >= end: | |
| return [] | |
| return data[start:end].tolist() | |
| info = self.series_info[series_path] | |
| timestamps = self._timestamps_cache[series_path] | |
| length = len(timestamps) | |
| if start >= length: | |
| return [] | |
| if end > length: | |
| end = length | |
| if start >= end: | |
| return [] |
| # Multiple tag columns: use structured approach | ||
| # Convert to list of tuples for grouping | ||
| n = len(all_times) | ||
| tag_tuples = [ | ||
| tuple(all_tags[tc][i] for tc in tag_columns) | ||
| for i in range(n) | ||
| ] | ||
| unique_tuples = list(dict.fromkeys(tag_tuples)) | ||
| for ut in unique_tuples: | ||
| mask = np.array([t == ut for t in tag_tuples], dtype=bool) | ||
| self._register_tag_group( | ||
| table_name, tag_columns, ut, | ||
| field_columns, all_times[mask] |
There was a problem hiding this comment.
The multi-tag grouping path builds tag_tuples in Python and then, for each unique tuple, constructs a full boolean mask with a Python loop ([t == ut for t in tag_tuples]). This is O(n * unique_tags) and will become a bottleneck for large tables. Consider using a vectorized approach (e.g., structured NumPy array + np.unique(..., return_inverse=True) or pandas/groupby) to compute groups and indices in (near) linear time.
| # Multiple tag columns: use structured approach | |
| # Convert to list of tuples for grouping | |
| n = len(all_times) | |
| tag_tuples = [ | |
| tuple(all_tags[tc][i] for tc in tag_columns) | |
| for i in range(n) | |
| ] | |
| unique_tuples = list(dict.fromkeys(tag_tuples)) | |
| for ut in unique_tuples: | |
| mask = np.array([t == ut for t in tag_tuples], dtype=bool) | |
| self._register_tag_group( | |
| table_name, tag_columns, ut, | |
| field_columns, all_times[mask] | |
| # Multiple tag columns: use structured NumPy array for grouping | |
| n = len(all_times) | |
| # Build a structured array with one field per tag column | |
| dtype = [(tc, all_tags[tc].dtype) for tc in tag_columns] | |
| structured_tags = np.empty(n, dtype=dtype) | |
| for tc in tag_columns: | |
| structured_tags[tc] = all_tags[tc] | |
| # Find unique tag combinations and an inverse index for grouping | |
| unique_vals, inverse = np.unique( | |
| structured_tags, return_inverse=True | |
| ) | |
| # Group rows by unique tag combination using the inverse index | |
| for group_id, ut in enumerate(unique_vals): | |
| mask = inverse == group_id | |
| tag_values = tuple(ut[tc] for tc in tag_columns) | |
| self._register_tag_group( | |
| table_name, | |
| tag_columns, | |
| tag_values, | |
| field_columns, | |
| all_times[mask], |
There was a problem hiding this comment.
For tables with multiple tag columns, grouping is done by:
- Building a Python list of tuples (line 174-177): O(n)
- For each unique tuple, scanning the entire list to build a boolean mask (line 180): O(n * k) where k = number of unique groups
For a table with 1M rows and 1000 tag groups, this becomes ~1 billion comparisons in pure Python.
| class TsFileDataFrame: | ||
| """ | ||
| Lazy-loaded unified view over multiple TsFile files. | ||
|
|
||
| Each dimension is a time series identified by Table + Tags + Field. | ||
| Supports cross-file merging of same-named series. | ||
| """ | ||
|
|
There was a problem hiding this comment.
This PR adds a sizable new public API surface (TsFileDataFrame, .loc, cross-file merge semantics, and TsFileSeriesReader-based reading), but there are no accompanying unit tests. The repo already has pytest-based coverage for readers/writers and Arrow batch queries; please add tests that cover: series discovery across multiple files, Timeseries row slicing, .loc alignment for multiple series, and behavior with missing/null values.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 4 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def read_series_range(self, series_path: str, start: int, end: int) -> List[float]: | ||
| """Read specified range of time series by row index. | ||
|
|
||
| Args: | ||
| series_path: Time series path. | ||
| start: Start index (inclusive). | ||
| end: End index (exclusive). | ||
|
|
||
| Returns: | ||
| List of data points. | ||
| """ | ||
| if series_path not in self.series_info: | ||
| raise ValueError(f"Series not found: {series_path}") | ||
|
|
||
| if series_path in self._series_data_cache: | ||
| return self._series_data_cache[series_path][start:end].tolist() | ||
|
|
||
| info = self.series_info[series_path] | ||
| timestamps = self._timestamps_cache[series_path] | ||
|
|
||
| start_time = int(timestamps[start]) | ||
| end_time = int(timestamps[end - 1]) | ||
|
|
There was a problem hiding this comment.
read_series_range() assumes start < end and will raise or return incorrect results when start == end (it computes end_time = timestamps[end - 1]). Consider explicitly handling empty ranges (return []) and validating bounds early with a clear error (e.g., start < 0, end > len, start > end).
| try: | ||
| self._reader = TsFileReaderPy(file_path) | ||
| except Exception as e: | ||
| raise ValueError(f"Failed to open TsFile: {e}") |
There was a problem hiding this comment.
When rethrowing the exception from TsFileReaderPy(file_path), the original traceback is lost. Prefer exception chaining (raise ValueError(...) from e) so callers can debug the underlying failure more easily.
| Returned by .loc[...] and df[slice/list]. Supports: | ||
| - result.timestamps -> np.ndarray of ms timestamps | ||
| - result.values -> np.ndarray of shape (rows, cols) | ||
| - result.series_names -> list of series name strings | ||
| - result[i] / result[i, j] -> index into values | ||
| - print(result) -> truncated table (20 rows) | ||
| - result.show() -> full table (no truncation) | ||
| - result.show(50) -> show up to 50 rows | ||
| """ |
There was a problem hiding this comment.
Docstring says AlignedTimeseries is "Returned by ... df[slice/list]", but TsFileDataFrame.__getitem__ for slice/list returns a subset TsFileDataFrame, not an AlignedTimeseries. Please update the docstring to match actual behavior (or implement the documented behavior).
| class TsFileDataFrame: | ||
| """ | ||
| Lazy-loaded unified view over multiple TsFile files. | ||
|
|
||
| Each dimension is a time series identified by Table + Tags + Field. | ||
| Supports cross-file merging of same-named series. | ||
| """ | ||
|
|
||
| def __init__(self, paths: Union[str, List[str]], show_progress: bool = True): | ||
| if isinstance(paths, str): | ||
| paths = [paths] | ||
|
|
||
| # Expand directories: collect all .tsfile files within each directory | ||
| expanded = [] | ||
| for p in paths: | ||
| if os.path.isdir(p): | ||
| tsfiles = sorted( | ||
| os.path.join(root, f) | ||
| for root, _, files in os.walk(p) | ||
| for f in files | ||
| if f.endswith('.tsfile') | ||
| ) | ||
| if not tsfiles: | ||
| raise FileNotFoundError( | ||
| f"No .tsfile files found in directory: {p}" | ||
| ) | ||
| expanded.extend(tsfiles) | ||
| else: | ||
| expanded.append(p) | ||
|
|
||
| self._paths = [] | ||
| for p in expanded: | ||
| if not os.path.exists(p): | ||
| raise FileNotFoundError(f"TsFile not found: {p}") | ||
| self._paths.append(os.path.abspath(p)) | ||
|
|
||
| self._show_progress = show_progress | ||
| self._readers = {} | ||
| self._series_list: List[str] = [] | ||
| self._series_map: Dict[str, list] = {} # series_path -> [(reader, info), ...] | ||
| self._merged_timestamps: Dict[str, np.ndarray] = {} | ||
| self._merged_info: Dict[str, dict] = {} | ||
| self._name_to_index: Dict[str, int] = {} | ||
|
|
||
| self._is_view = False | ||
| self._root = None | ||
|
|
||
| self._load_metadata() | ||
|
|
||
| @classmethod | ||
| def _from_subset(cls, parent: 'TsFileDataFrame', series_names: List[str]) -> 'TsFileDataFrame': | ||
| """Create a lightweight view over a subset of series. | ||
|
|
||
| Shares readers, series_map, merged_timestamps, and merged_info | ||
| with the root. Does NOT own any readers and will not close them. | ||
| """ | ||
| obj = object.__new__(cls) | ||
| obj._root = parent._root if parent._is_view else parent | ||
| obj._is_view = True | ||
| obj._paths = parent._paths | ||
| obj._readers = parent._readers | ||
| obj._series_map = parent._series_map | ||
| obj._merged_timestamps = parent._merged_timestamps | ||
| obj._merged_info = parent._merged_info | ||
| obj._show_progress = parent._show_progress | ||
| obj._series_list = list(series_names) | ||
| obj._name_to_index = {name: i for i, name in enumerate(series_names)} | ||
| return obj | ||
|
|
||
| def _load_metadata(self): | ||
| """Load metadata from all TsFile files.""" | ||
| from .tsfile_series_reader import TsFileSeriesReader | ||
|
|
||
| if len(self._paths) >= 2: | ||
| self._load_metadata_parallel(TsFileSeriesReader) | ||
| else: | ||
| self._load_metadata_serial(TsFileSeriesReader) | ||
|
|
There was a problem hiding this comment.
TsFileDataFrame, Timeseries, and .loc[...] introduce substantial new public API and non-trivial merging/alignment logic, but there are currently no pytest tests covering them (no TsFileDataFrame references under python/tests). Please add focused tests for: metadata loading from multiple shards, __getitem__ by index/name/slice, Timeseries row slicing, and .loc time-range alignment across multiple series/files.
| def show(self, max_rows: Optional[int] = None): | ||
| """Print formatted table with configurable row limit. | ||
|
|
||
| Args: | ||
| max_rows: Maximum rows to display. None for all rows. | ||
| """ | ||
| n_rows, n_cols = self.values.shape | ||
| if n_rows == 0: | ||
| print(f"AlignedTimeseries(0 rows, {n_cols} series)") | ||
| return | ||
| ts_strs, ts_width, col_widths, val_strs = self._build_display() | ||
| print(self._format_rows(ts_strs, ts_width, col_widths, val_strs, max_rows)) |
There was a problem hiding this comment.
Pass max_rows to _build_display?
| n_rows = len(ts_strs) | ||
| n_cols = len(col_widths) | ||
|
|
||
| header_parts = ['timestamp'.rjust(ts_width)] |
There was a problem hiding this comment.
The column is a datetime string; it is improper to call it timestamp.
May use time or datetime.
| col_widths = [] | ||
| val_strs = [] | ||
| for col_idx in range(n_cols): | ||
| col_name = self.series_names[col_idx] if col_idx < len(self.series_names) else f'col_{col_idx}' |
There was a problem hiding this comment.
In what case will col_xxx be used?
| def _build_display(self): | ||
| """Pre-compute string representations for display.""" | ||
| n_rows, n_cols = self.values.shape | ||
| ts_strs = [_format_timestamp(int(t)) for t in self.timestamps] | ||
| ts_width = max((len(s) for s in ts_strs), default=0) | ||
| ts_width = max(ts_width, len('timestamp')) | ||
|
|
||
| col_widths = [] | ||
| val_strs = [] | ||
| for col_idx in range(n_cols): |
There was a problem hiding this comment.
Is it possible to concat timestamps and values, then use ndarry's print method if any?
| # Deduplicate by timestamp (keep first occurrence) | ||
| _, unique_idx = np.unique(merged_ts, return_index=True) | ||
| return merged_ts[unique_idx], merged_vals[unique_idx] |
There was a problem hiding this comment.
Why keep the first occurrence?
| table_schemas = self._reader.get_all_table_schemas() | ||
| if not table_schemas: | ||
| raise ValueError("No tables found in TsFile") | ||
|
|
||
| self.series_paths = [] | ||
| table_names = list(table_schemas.keys()) | ||
|
|
||
| # Progress tracking | ||
| total_rows = 0 | ||
|
|
||
| for ti, table_name in enumerate(table_names): | ||
| table_schema = self._reader.get_table_schema(table_name) |
There was a problem hiding this comment.
Why not use table_schemas?
| if col_name.lower() == 'time': | ||
| continue |
There was a problem hiding this comment.
Also check column category?
| # Query TAG columns + first FIELD column to discover groups and timestamps | ||
| query_cols = tag_columns + [field_columns[0]] | ||
|
|
||
| time_arrays = [] | ||
| tag_arrays = {tc: [] for tc in tag_columns} | ||
|
|
||
| with self._reader.query_table_batch( | ||
| table_name, query_cols, batch_size=65536 | ||
| ) as rs: |
There was a problem hiding this comment.
A TsFile may not actually contain a column's data even if it is in the schema.
Double-check if timestamps can be read in this scenario.
There was a problem hiding this comment.
Storing tags repeatedly is a great challenge to memory.
May consider getting all DeviceIds first, and conclude tag values from them.
| if arrow_table.num_rows > 0: | ||
| ts_list.append(arrow_table.column('time').to_numpy()) | ||
| for fc in field_columns: | ||
| field_lists[fc].append( | ||
| arrow_table.column(fc).to_numpy().astype(np.float64) | ||
| ) |
| def cache_series_data(self, series_path: str): | ||
| """Pre-load series data into memory cache. | ||
|
|
||
| Args: | ||
| series_path: Time series path. | ||
| """ | ||
| if series_path not in self.series_info: | ||
| raise ValueError(f"Series not found: {series_path}") | ||
| if series_path not in self._series_data_cache: | ||
| data = self.read_series(series_path) | ||
| self._series_data_cache[series_path] = np.array(data, dtype=np.float32) |
There was a problem hiding this comment.
The method is not used? And there does not seem to be any memory control.
| raise ValueError(f"Series not found: {series_path}") | ||
| if series_path not in self._series_data_cache: | ||
| data = self.read_series(series_path) | ||
| self._series_data_cache[series_path] = np.array(data, dtype=np.float32) |
There was a problem hiding this comment.
read_series returns float64 values (via _read_arrow which casts to np.float64), but cache_series_data stores them as float32. When read_series is later called on a cached series, it returns float32 values silently, which can lose precision.
| # Multiple tag columns: use structured approach | ||
| # Convert to list of tuples for grouping | ||
| n = len(all_times) | ||
| tag_tuples = [ | ||
| tuple(all_tags[tc][i] for tc in tag_columns) | ||
| for i in range(n) | ||
| ] | ||
| unique_tuples = list(dict.fromkeys(tag_tuples)) | ||
| for ut in unique_tuples: | ||
| mask = np.array([t == ut for t in tag_tuples], dtype=bool) | ||
| self._register_tag_group( | ||
| table_name, tag_columns, ut, | ||
| field_columns, all_times[mask] |
There was a problem hiding this comment.
For tables with multiple tag columns, grouping is done by:
- Building a Python list of tuples (line 174-177): O(n)
- For each unique tuple, scanning the entire list to build a boolean mask (line 180): O(n * k) where k = number of unique groups
For a table with 1M rows and 1000 tag groups, this becomes ~1 billion comparisons in pure Python.
| if idx < 0 or idx >= len(self._df._series_list): | ||
| raise IndexError(f"Series index {idx} out of range") |
There was a problem hiding this comment.
Negative indices are not normalized (no idx = length + idx like other __getitem__ methods in this file). A user passing df.loc[:, [-1]] would get an IndexError instead of the last series.
| merged = np.unique(np.concatenate(all_ts)) | ||
| merged.sort() |
There was a problem hiding this comment.
np.unique already returns a sorted array. The .sort() call is redundant.
| if max_rows is None or n_rows <= max_rows: | ||
| show_rows = list(range(n_rows)) | ||
| else: | ||
| show_rows = list(range(max_rows)) |
There was a problem hiding this comment.
When max_rows is exceeded, only the first max_rows rows are shown. In contrast, TsFileDataFrame._format_table shows head + "..." + tail. This inconsistency may confuse users expecting similar behavior from both display methods.
| from .tsfile_table_writer import TsFileTableWriter | ||
| from .utils import to_dataframe, dataframe_to_tsfile No newline at end of file | ||
| from .utils import to_dataframe, dataframe_to_tsfile | ||
| from .tsfile_dataframe import TsFileDataFrame, Timeseries, AlignedTimeseries No newline at end of file |
There was a problem hiding this comment.
The file ends without a newline character (\ No newline at end of file).
| def close(self): | ||
| """Close all underlying readers. | ||
|
|
||
| No-op for subset views (they don't own readers). | ||
| """ | ||
| if self._is_view: | ||
| return | ||
| for reader in self._readers.values(): | ||
| reader.close() | ||
| self._readers.clear() |
There was a problem hiding this comment.
After close(), _readers is cleared but _series_map still holds references to closed readers. Any subsequent data access (e.g., tsdf[0][0]) will attempt to read from a closed reader, producing an unclear error.
Recommendation: Either invalidate _series_map too, or set a _closed flag and check it in data-access paths.
| except Exception as e: | ||
| raise ValueError( | ||
| f"Failed to read TsFile metadata. " | ||
| f"Please ensure the TsFile is valid and readable. Error: {e}" | ||
| ) |
There was a problem hiding this comment.
The original traceback is lost. Use raise ValueError(...) from e to preserve the exception chain for debugging.
This PR introduce TsFileDataFrame, which can read multi tsfile for model training usage. Just like use dataframe.
https://apache-iotdb-project.feishu.cn/docx/SenJdxlbuoUS5Uxmq7jcOUzdnob?from=from_copylink