diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py index 55fa3b9e4..1149e6607 100644 --- a/python/tsfile/__init__.py +++ b/python/tsfile/__init__.py @@ -40,4 +40,5 @@ from .tsfile_writer import TsFileWriterPy as TsFileWriter from .tsfile_py_cpp import get_tsfile_config, set_tsfile_config from .tsfile_table_writer import TsFileTableWriter -from .utils import to_dataframe, dataframe_to_tsfile \ No newline at end of file +from .utils import to_dataframe, dataframe_to_tsfile +from .tsfile_dataframe import TsFileDataFrame, Timeseries, AlignedTimeseries \ No newline at end of file diff --git a/python/tsfile/tsfile_dataframe.py b/python/tsfile/tsfile_dataframe.py new file mode 100644 index 000000000..09497e277 --- /dev/null +++ b/python/tsfile/tsfile_dataframe.py @@ -0,0 +1,859 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +TsFileDataFrame - Lazy-loaded unified view over multiple TsFile files. + +Provides array-like and DataFrame-like access to time series data +stored in TsFile format. Supports: +1. Pre-training: random index access with array-like slicing +2. Post-training: time-aligned multi-series queries via .loc +""" + +import os +import sys +from collections import defaultdict +from typing import List, Dict, Union, Optional, Tuple +from datetime import datetime + +import numpy as np + + +def _format_timestamp(ts_ms: int) -> str: + """Convert millisecond timestamp to human-readable string.""" + try: + return datetime.fromtimestamp(ts_ms / 1000).strftime('%Y-%m-%d %H:%M:%S') + except (OSError, ValueError): + return str(ts_ms) + + +class AlignedTimeseries: + """ + Time-aligned multi-series query result with timestamps. + + Returned by .loc[...] and df[slice/list]. Supports: + - result.timestamps -> np.ndarray of ms timestamps + - result.values -> np.ndarray of shape (rows, cols) + - result.series_names -> list of series name strings + - result[i] / result[i, j] -> index into values + - print(result) -> truncated table (20 rows) + - result.show() -> full table (no truncation) + - result.show(50) -> show up to 50 rows + """ + + def __init__(self, timestamps: np.ndarray, values: np.ndarray, series_names: List[str]): + self.timestamps = timestamps + self.values = values + self.series_names = series_names + + @property + def shape(self): + return self.values.shape + + def __len__(self): + return len(self.timestamps) + + def __getitem__(self, key): + return self.values[key] + + def _build_display(self): + """Pre-compute string representations for display.""" + n_rows, n_cols = self.values.shape + ts_strs = [_format_timestamp(int(t)) for t in self.timestamps] + ts_width = max((len(s) for s in ts_strs), default=0) + ts_width = max(ts_width, len('timestamp')) + + col_widths = [] + val_strs = [] + for col_idx in range(n_cols): + col_name = self.series_names[col_idx] if col_idx < len(self.series_names) else f'col_{col_idx}' + w = len(col_name) + col_vals = [] + for row_idx in range(n_rows): + v = self.values[row_idx, col_idx] + s = 'NaN' if np.isnan(v) else f'{v:.2f}' + col_vals.append(s) + w = max(w, len(s)) + val_strs.append(col_vals) + col_widths.append(w) + + return ts_strs, ts_width, col_widths, val_strs + + def _format_rows(self, ts_strs, ts_width, col_widths, val_strs, max_rows): + """Format rows with optional truncation.""" + n_rows = len(ts_strs) + n_cols = len(col_widths) + + header_parts = ['timestamp'.rjust(ts_width)] + for col_idx in range(n_cols): + col_name = self.series_names[col_idx] if col_idx < len(self.series_names) else f'col_{col_idx}' + header_parts.append(col_name.rjust(col_widths[col_idx])) + lines = [' '.join(header_parts)] + + if max_rows is None or n_rows <= max_rows: + show_rows = list(range(n_rows)) + else: + show_rows = list(range(max_rows)) + + for row_idx in show_rows: + parts = [ts_strs[row_idx].rjust(ts_width)] + for col_idx in range(n_cols): + parts.append(val_strs[col_idx][row_idx].rjust(col_widths[col_idx])) + lines.append(' '.join(parts)) + + return f"AlignedTimeseries({n_rows} rows, {n_cols} series)\n" + '\n'.join(lines) + + def __repr__(self): + n_rows, n_cols = self.values.shape + if n_rows == 0: + return f"AlignedTimeseries(0 rows, {n_cols} series)" + ts_strs, ts_width, col_widths, val_strs = self._build_display() + return self._format_rows(ts_strs, ts_width, col_widths, val_strs, max_rows=20) + + def show(self, max_rows: Optional[int] = None): + """Print formatted table with configurable row limit. + + Args: + max_rows: Maximum rows to display. None for all rows. + """ + n_rows, n_cols = self.values.shape + if n_rows == 0: + print(f"AlignedTimeseries(0 rows, {n_cols} series)") + return + ts_strs, ts_width, col_widths, val_strs = self._build_display() + print(self._format_rows(ts_strs, ts_width, col_widths, val_strs, max_rows)) + + +class Timeseries: + """ + Single time series abstraction. + + Supports row-based slicing (converting to time-range queries internally), + stats access, and length queries. + + When a series spans multiple files, data is merged transparently. + """ + + def __init__(self, name: str, readers_and_infos: list, merged_timestamps: np.ndarray): + """ + Args: + name: Full series path (e.g., "weather.Beijing.humidity"). + readers_and_infos: List of (reader, series_info) tuples for this series. + merged_timestamps: Sorted, deduplicated timestamp array across all files. + """ + self._name = name + self._readers_and_infos = readers_and_infos + self._timestamps = merged_timestamps + + @property + def name(self) -> str: + return self._name + + @property + def timestamps(self) -> np.ndarray: + return self._timestamps + + @property + def stats(self) -> dict: + count = len(self._timestamps) + if count == 0: + return {'start_time': None, 'end_time': None, 'count': 0} + return { + 'start_time': int(self._timestamps[0]), + 'end_time': int(self._timestamps[-1]), + 'count': count, + } + + def __len__(self) -> int: + return len(self._timestamps) + + def __getitem__(self, key): + """Row-based access. + + - series[20] -> single float value + - series[20:100] -> np.ndarray of values + """ + length = len(self._timestamps) + + if isinstance(key, int): + if key < 0: + key = length + key + if key < 0 or key >= length: + raise IndexError(f"Index {key} out of range [0, {length})") + ts = int(self._timestamps[key]) + _, vals = self._query_time_range(ts, ts) + return float(vals[0]) if len(vals) > 0 else None + + elif isinstance(key, slice): + start, stop, step = key.indices(length) + if start >= stop: + return np.array([], dtype=np.float64) + + # Get exact timestamps for the requested rows + requested_ts = self._timestamps[start:stop] + if len(requested_ts) == 0: + return np.array([], dtype=np.float64) + + # Query by time range, then filter to exact timestamps + start_ts = int(requested_ts[0]) + end_ts = int(requested_ts[-1]) + ts_arr, vals = self._query_time_range(start_ts, end_ts) + + # Vectorized alignment: both ts_arr and requested_ts are sorted + result = np.full(len(requested_ts), np.nan) + if len(ts_arr) > 0: + indices = np.searchsorted(ts_arr, requested_ts) + valid = (indices < len(ts_arr)) & (ts_arr[np.minimum(indices, len(ts_arr) - 1)] == requested_ts) + result[valid] = vals[indices[valid]] + vals = result + + if step != 1: + vals = vals[::step] + return vals + + else: + raise TypeError(f"Unsupported key type: {type(key)}") + + def _query_time_range(self, start_time: int, end_time: int) -> Tuple[np.ndarray, np.ndarray]: + """Query all readers for this series in the given time range, merge results.""" + all_ts = [] + all_vals = [] + for reader, info in self._readers_and_infos: + # Skip reader if its data doesn't overlap + if info['max_time'] < start_time or info['min_time'] > end_time: + continue + ts_arr, val_arr = reader.read_series_by_time_range( + self._name, start_time, end_time + ) + if len(ts_arr) > 0: + all_ts.append(ts_arr) + all_vals.append(val_arr) + + if not all_ts: + return np.array([], dtype=np.int64), np.array([], dtype=np.float64) + + if len(all_ts) == 1: + return all_ts[0], all_vals[0] + + # Merge from multiple readers, sort by timestamp, deduplicate + merged_ts = np.concatenate(all_ts) + merged_vals = np.concatenate(all_vals) + sort_idx = np.argsort(merged_ts, kind='mergesort') + merged_ts = merged_ts[sort_idx] + merged_vals = merged_vals[sort_idx] + + # Deduplicate by timestamp (keep first occurrence) + _, unique_idx = np.unique(merged_ts, return_index=True) + return merged_ts[unique_idx], merged_vals[unique_idx] + + def __repr__(self): + stats = self.stats + if stats['count'] == 0: + return f"Timeseries('{self._name}', count=0)" + return ( + f"Timeseries('{self._name}', count={stats['count']}, " + f"start={_format_timestamp(stats['start_time'])}, " + f"end={_format_timestamp(stats['end_time'])})" + ) + + +class _LocIndexer: + """ + Implements .loc[start_time:end_time, series_list] for time-aligned queries. + + Returns AlignedTimeseries with timestamps, values, and series names. + """ + + def __init__(self, dataframe: 'TsFileDataFrame'): + self._df = dataframe + + def _parse_key(self, key): + """Parse key into (start_time, end_time, series_names).""" + if not isinstance(key, tuple) or len(key) != 2: + raise ValueError( + "loc requires exactly 2 arguments: " + "tsdf.loc[start_time:end_time, series_list]" + ) + + time_slice, series_spec = key + + # Parse time range + if isinstance(time_slice, slice): + start_time = time_slice.start + end_time = time_slice.stop + if start_time is None: + start_time = np.iinfo(np.int64).min + if end_time is None: + end_time = np.iinfo(np.int64).max + elif isinstance(time_slice, (int, np.integer)): + start_time = end_time = int(time_slice) + else: + raise TypeError(f"Time index must be slice or int, got {type(time_slice)}") + + # Parse series names + if isinstance(series_spec, (str, int)): + series_spec = [series_spec] + + series_names = [] + for s in series_spec: + if isinstance(s, (int, np.integer)): + idx = int(s) + if idx < 0 or idx >= len(self._df._series_list): + raise IndexError(f"Series index {idx} out of range") + series_names.append(self._df._series_list[idx]) + elif isinstance(s, str): + if s not in self._df._series_map: + raise KeyError(f"Series not found: {s}") + series_names.append(s) + else: + raise TypeError(f"Series specifier must be int or str, got {type(s)}") + + return start_time, end_time, series_names + + def _query_aligned(self, start_time: int, end_time: int, series_names: List[str]): + """Query and align multiple series using batched Arrow reads.""" + # Group series by (reader_id, table_name, tag_tuple) for batch queries. + # Each group can be fetched with a single query_table_batch call. + groups = defaultdict(list) # key -> [(col_idx, field_name, series_name, reader, info)] + + for col_idx, name in enumerate(series_names): + entries = self._df._series_map[name] + for reader, info in entries: + if info['max_time'] < start_time or info['min_time'] > end_time: + continue + key = (id(reader), info['table_name'], + tuple(sorted(info['tag_values'].items()))) + groups[key].append((col_idx, info['column_name'], name, reader, info)) + + # Fetch data: one query per group + series_data = {} # series_name -> (ts_arr, val_arr) + + for key, entries in groups.items(): + reader = entries[0][3] + info = entries[0][4] + field_columns = list(dict.fromkeys(e[1] for e in entries)) # dedupe, keep order + + ts_arr, field_vals = reader.read_multi_series_by_time_range( + info['table_name'], field_columns, + info['tag_columns'], info['tag_values'], + start_time, end_time, + ) + + for _col_idx, field_name, name, _, _ in entries: + if name in series_data: + # Series spans multiple readers: merge + prev_ts, prev_val = series_data[name] + merged_ts = np.concatenate([prev_ts, ts_arr]) + merged_val = np.concatenate([prev_val, field_vals[field_name]]) + sort_idx = np.argsort(merged_ts, kind='mergesort') + merged_ts = merged_ts[sort_idx] + merged_val = merged_val[sort_idx] + _, unique_idx = np.unique(merged_ts, return_index=True) + series_data[name] = (merged_ts[unique_idx], merged_val[unique_idx]) + else: + series_data[name] = (ts_arr, field_vals[field_name]) + + # Collect unique timestamps using numpy + all_ts_arrays = [data[0] for data in series_data.values() if len(data[0]) > 0] + if not all_ts_arrays: + return (np.array([], dtype=np.int64), + np.array([]).reshape(0, len(series_names))) + + sorted_timestamps = np.unique(np.concatenate(all_ts_arrays)) + + # Build result matrix using np.searchsorted for vectorized alignment + result = np.full((len(sorted_timestamps), len(series_names)), np.nan) + + for col_idx, name in enumerate(series_names): + if name not in series_data: + continue + ts_arr, val_arr = series_data[name] + if len(ts_arr) == 0: + continue + indices = np.searchsorted(sorted_timestamps, ts_arr) + result[indices, col_idx] = val_arr + + return sorted_timestamps, result + + def __getitem__(self, key) -> 'AlignedTimeseries': + start_time, end_time, series_names = self._parse_key(key) + timestamps, values = self._query_aligned(start_time, end_time, series_names) + return AlignedTimeseries(timestamps, values, series_names) + + +class TsFileDataFrame: + """ + Lazy-loaded unified view over multiple TsFile files. + + Each dimension is a time series identified by Table + Tags + Field. + Supports cross-file merging of same-named series. + """ + + def __init__(self, paths: Union[str, List[str]], show_progress: bool = True): + if isinstance(paths, str): + paths = [paths] + + # Expand directories: collect all .tsfile files within each directory + expanded = [] + for p in paths: + if os.path.isdir(p): + tsfiles = sorted( + os.path.join(root, f) + for root, _, files in os.walk(p) + for f in files + if f.endswith('.tsfile') + ) + if not tsfiles: + raise FileNotFoundError( + f"No .tsfile files found in directory: {p}" + ) + expanded.extend(tsfiles) + else: + expanded.append(p) + + self._paths = [] + for p in expanded: + if not os.path.exists(p): + raise FileNotFoundError(f"TsFile not found: {p}") + self._paths.append(os.path.abspath(p)) + + self._show_progress = show_progress + self._readers = {} + self._series_list: List[str] = [] + self._series_map: Dict[str, list] = {} # series_path -> [(reader, info), ...] + self._merged_timestamps: Dict[str, np.ndarray] = {} + self._merged_info: Dict[str, dict] = {} + self._name_to_index: Dict[str, int] = {} + + self._is_view = False + self._root = None + + self._load_metadata() + + @classmethod + def _from_subset(cls, parent: 'TsFileDataFrame', series_names: List[str]) -> 'TsFileDataFrame': + """Create a lightweight view over a subset of series. + + Shares readers, series_map, merged_timestamps, and merged_info + with the root. Does NOT own any readers and will not close them. + """ + obj = object.__new__(cls) + obj._root = parent._root if parent._is_view else parent + obj._is_view = True + obj._paths = parent._paths + obj._readers = parent._readers + obj._series_map = parent._series_map + obj._merged_timestamps = parent._merged_timestamps + obj._merged_info = parent._merged_info + obj._show_progress = parent._show_progress + obj._series_list = list(series_names) + obj._name_to_index = {name: i for i, name in enumerate(series_names)} + return obj + + def _load_metadata(self): + """Load metadata from all TsFile files.""" + from .tsfile_series_reader import TsFileSeriesReader + + if len(self._paths) >= 2: + self._load_metadata_parallel(TsFileSeriesReader) + else: + self._load_metadata_serial(TsFileSeriesReader) + + # Build ordered series list and merged metadata + seen = set() + for file_path in self._paths: + reader = self._readers[file_path] + for series_path in reader.series_paths: + if series_path not in seen: + seen.add(series_path) + self._series_list.append(series_path) + self._name_to_index[series_path] = len(self._series_list) - 1 + self._build_merged_info(series_path) + + if not self._series_list: + raise ValueError("No valid time series found in the provided TsFile files") + + def _load_metadata_serial(self, ReaderClass): + """Load metadata from files sequentially.""" + for file_path in self._paths: + reader = ReaderClass(file_path, show_progress=self._show_progress) + self._register_reader(file_path, reader) + + def _load_metadata_parallel(self, ReaderClass): + """Load metadata from files in parallel using threads.""" + from concurrent.futures import ThreadPoolExecutor, as_completed + + def _open_file(file_path): + return file_path, ReaderClass(file_path, show_progress=False) + + num_workers = min(len(self._paths), os.cpu_count() or 4) + total = len(self._paths) + with ThreadPoolExecutor(max_workers=num_workers) as executor: + futures = {executor.submit(_open_file, p): p for p in self._paths} + results = {} + done_count = 0 + for future in as_completed(futures): + file_path, reader = future.result() + results[file_path] = reader + done_count += 1 + if self._show_progress: + sys.stderr.write( + f"\rLoading TsFile shards: " + f"{done_count}/{total}" + ) + sys.stderr.flush() + + if self._show_progress and total > 0: + total_series = sum(len(r.series_paths) for r in results.values()) + sys.stderr.write( + f"\rLoading TsFile shards: " + f"{total}/{total} " + f"({total_series} series) " + f"... done\n" + ) + sys.stderr.flush() + + # Register in original path order to keep series ordering deterministic + for file_path in self._paths: + self._register_reader(file_path, results[file_path]) + + def _register_reader(self, file_path: str, reader): + """Register a reader and index its series.""" + self._readers[file_path] = reader + for series_path in reader.series_paths: + if series_path not in self._series_map: + self._series_map[series_path] = [] + self._series_map[series_path].append( + (reader, reader.series_info[series_path]) + ) + + def _build_merged_info(self, series_path: str): + """Merge timestamps and metadata from all readers for a given series.""" + entries = self._series_map[series_path] + # Use first entry for structural metadata (table/tag/field are identical across files) + _, first_info = entries[0] + + if len(entries) == 1: + reader, info = entries[0] + self._merged_timestamps[series_path] = reader._timestamps_cache[series_path] + self._merged_info[series_path] = { + 'table_name': info['table_name'], + 'tag_columns': info['tag_columns'], + 'tag_values': info['tag_values'], + 'field': info['column_name'], + 'min_time': info['min_time'], + 'max_time': info['max_time'], + 'count': info['length'], + } + else: + all_ts = [] + for reader, info in entries: + all_ts.append(reader._timestamps_cache[series_path]) + merged = np.unique(np.concatenate(all_ts)) + merged.sort() + self._merged_timestamps[series_path] = merged + self._merged_info[series_path] = { + 'table_name': first_info['table_name'], + 'tag_columns': first_info['tag_columns'], + 'tag_values': first_info['tag_values'], + 'field': first_info['column_name'], + 'min_time': int(merged[0]), + 'max_time': int(merged[-1]), + 'count': len(merged), + } + + def __len__(self) -> int: + return len(self._series_list) + + @property + def timeseries(self) -> List[dict]: + """Return metadata for each series as a list of dicts.""" + result = [] + for name in self._series_list: + info = self._merged_info[name] + entry = { + 'table': info['table_name'], + 'field': info['field'], + 'start_time': _format_timestamp(info['min_time']), + 'end_time': _format_timestamp(info['max_time']), + 'count': info['count'], + } + for col in info['tag_columns']: + entry[col] = info['tag_values'].get(col, '') + result.append(entry) + return result + + def list_timeseries(self, path_prefix: str = "") -> List[str]: + """List series paths, optionally filtered by prefix. + + Args: + path_prefix: If given, only series whose path starts with this prefix. + + Returns: + List of series path strings. + """ + if not path_prefix: + return list(self._series_list) + prefix = path_prefix if path_prefix.endswith('.') else path_prefix + '.' + return [name for name in self._series_list + if name.startswith(prefix) or name == path_prefix] + + def metadata(self): + """Return metadata for all time series as a pandas DataFrame. + + Each row represents one time series. Columns are: + - series_path : full dotted path (table[.tag_values...].field) + - table : TsFile table name + - : one column per tag column, filled with the tag value + (empty string if this series has no such tag) + - field : measurement / field column name + - start_time : earliest timestamp (ms, int) + - end_time : latest timestamp (ms, int) + - count : number of data points + + Returns: + pandas.DataFrame + """ + import pandas as pd + + tag_cols = self._collect_tag_columns() + fixed_cols = ['series_path', 'table'] + tag_cols + ['field', 'start_time', 'end_time', 'count'] + + rows = [] + for name in self._series_list: + info = self._merged_info[name] + row = { + 'series_path': name, + 'table': info['table_name'], + 'field': info['field'], + 'start_time': info['min_time'], + 'end_time': info['max_time'], + 'count': info['count'], + } + for tc in tag_cols: + row[tc] = info['tag_values'].get(tc, '') + rows.append(row) + + return pd.DataFrame(rows, columns=fixed_cols) + + def _get_timeseries(self, name: str) -> 'Timeseries': + """Internal helper to create a Timeseries object.""" + return Timeseries(name, self._series_map[name], self._merged_timestamps[name]) + + def __getitem__(self, key): + """Access time series by index, name, slice, or list. + + - tsdf[0] -> Timeseries + - tsdf['weather.Beijing.humidity'] -> Timeseries + - tsdf[1:3] -> TsFileDataFrame (subset view) + - tsdf[[0, 1, 2]] -> TsFileDataFrame (subset view) + - tsdf[tsdf['start_time'] > xxx] -> TsFileDataFrame (subset view) + """ + # pandas boolean Series from metadata() filtering + try: + import pandas as pd + if isinstance(key, pd.Series) and key.dtype == bool: + selected_names = [self._series_list[i] for i in key.index[key]] + return TsFileDataFrame._from_subset(self, selected_names) + except ImportError: + pass + + if isinstance(key, (int, np.integer)): + key = int(key) + if key < 0: + key = len(self._series_list) + key + if key < 0 or key >= len(self._series_list): + raise IndexError(f"Index {key} out of range [0, {len(self._series_list)})") + name = self._series_list[key] + return self._get_timeseries(name) + + elif isinstance(key, str): + # If the key matches a series name in this dataframe's subset, return Timeseries. + # Otherwise treat it as a metadata column name and return a pandas Series, + # mirroring the behaviour of metadata()[key] so that: + # tsdf['ps_id'] == '10' -> boolean pd.Series + # tsdf[tsdf['ps_id'] == '10'] -> TsFileDataFrame (subset) + if key in self._name_to_index: + return self._get_timeseries(key) + # Check if it is a valid metadata column + valid_cols = {'table', 'field', 'start_time', 'end_time', 'count'} + valid_cols.update(self._collect_tag_columns()) + if key in valid_cols: + import pandas as pd + if key == 'table': + values = [self._merged_info[n]['table_name'] for n in self._series_list] + elif key == 'field': + values = [self._merged_info[n]['field'] for n in self._series_list] + elif key == 'start_time': + values = [self._merged_info[n]['min_time'] for n in self._series_list] + elif key == 'end_time': + values = [self._merged_info[n]['max_time'] for n in self._series_list] + elif key == 'count': + values = [self._merged_info[n]['count'] for n in self._series_list] + else: + values = [self._merged_info[n]['tag_values'].get(key, '') + for n in self._series_list] + return pd.Series(values, name=key) + raise KeyError(f"Series not found: '{key}'. " + f"Use df.metadata() to see available metadata columns.") + + elif isinstance(key, slice): + indices = list(range(*key.indices(len(self._series_list)))) + selected_names = [self._series_list[i] for i in indices] + return TsFileDataFrame._from_subset(self, selected_names) + + elif isinstance(key, list): + selected_names = [] + for k in key: + if not isinstance(k, (int, np.integer)): + raise TypeError(f"List index must contain integers, got {type(k)}") + idx = int(k) + if idx < 0: + idx = len(self._series_list) + idx + if idx < 0 or idx >= len(self._series_list): + raise IndexError(f"Index {k} out of range [0, {len(self._series_list)})") + selected_names.append(self._series_list[idx]) + return TsFileDataFrame._from_subset(self, selected_names) + + else: + raise TypeError(f"Unsupported key type: {type(key)}") + + @property + def loc(self): + """Attribute-style access to the loc indexer.""" + return _LocIndexer(self) + + def _collect_tag_columns(self) -> List[str]: + """Return ordered list of all unique tag column names across all series.""" + seen = {} + for name in self._series_list: + for col in self._merged_info[name]['tag_columns']: + if col not in seen: + seen[col] = True + return list(seen.keys()) + + def _format_table(self, indices=None, max_rows=20) -> str: + """Format series metadata as an aligned text table.""" + if indices is None: + indices = range(len(self._series_list)) + + indices = list(indices) + total = len(indices) + + if total > max_rows: + show_indices = list(indices[:max_rows // 2]) + list(indices[-max_rows // 2:]) + truncated = True + else: + show_indices = indices + truncated = False + + # Determine which tag columns exist across shown rows + tag_cols = self._collect_tag_columns() + + rows = [] + for idx in show_indices: + name = self._series_list[idx] + info = self._merged_info[name] + row = { + 'index': idx, + 'table': info['table_name'], + 'field': info['field'], + 'start_time': _format_timestamp(info['min_time']), + 'end_time': _format_timestamp(info['max_time']), + 'count': info['count'], + } + for tc in tag_cols: + row[tc] = info['tag_values'].get(tc, '') + rows.append(row) + + if not rows: + return "Empty TsFileDataFrame" + + # Build ordered headers: index, table, tag_cols..., field, start_time, end_time, count + fixed_headers = ['', 'table'] + tag_cols + ['field', 'start_time', 'end_time', 'count'] + col_widths = {h: len(h) for h in fixed_headers} + col_widths[''] = max(len(str(r['index'])) for r in rows) + + for r in rows: + col_widths[''] = max(col_widths[''], len(str(r['index']))) + col_widths['table'] = max(col_widths['table'], len(r['table'])) + col_widths['field'] = max(col_widths['field'], len(r['field'])) + col_widths['start_time'] = max(col_widths['start_time'], len(r['start_time'])) + col_widths['end_time'] = max(col_widths['end_time'], len(r['end_time'])) + col_widths['count'] = max(col_widths['count'], len(str(r['count']))) + for tc in tag_cols: + col_widths[tc] = max(col_widths[tc], len(str(r[tc]))) + + header_line = " ".join(h.rjust(col_widths[h]) for h in fixed_headers) + lines = [header_line] + + half = len(rows) // 2 if truncated else len(rows) + for i, r in enumerate(rows): + if truncated and i == half: + lines.append("...") + parts = [ + str(r['index']).rjust(col_widths['']), + r['table'].rjust(col_widths['table']), + ] + for tc in tag_cols: + parts.append(str(r[tc]).rjust(col_widths[tc])) + parts += [ + r['field'].rjust(col_widths['field']), + r['start_time'].rjust(col_widths['start_time']), + r['end_time'].rjust(col_widths['end_time']), + str(r['count']).rjust(col_widths['count']), + ] + lines.append(" ".join(parts)) + + return "\n".join(lines) + + def __repr__(self): + total = len(self._series_list) + if self._is_view: + root_total = len(self._root._series_list) + header = f"TsFileDataFrame({total} time series, subset of {root_total})\n" + else: + header = f"TsFileDataFrame({total} time series, {len(self._paths)} files)\n" + return header + self._format_table() + + def __str__(self): + return self.__repr__() + + def close(self): + """Close all underlying readers. + + No-op for subset views (they don't own readers). + """ + if self._is_view: + return + for reader in self._readers.values(): + reader.close() + self._readers.clear() + + def __del__(self): + try: + if not getattr(self, '_is_view', False): + self.close() + except Exception: + pass + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() diff --git a/python/tsfile/tsfile_series_reader.py b/python/tsfile/tsfile_series_reader.py new file mode 100644 index 000000000..b78471b5c --- /dev/null +++ b/python/tsfile/tsfile_series_reader.py @@ -0,0 +1,440 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +High-performance TsFile Series Reader + +Optimized for time series data reading using the Arrow columnar API. +Wraps TsFileReaderPy (Cython) and provides series-level metadata +discovery, timestamp caching, and batch reads with TAG filtering. +""" + +import os +import sys +from typing import List, Dict, Optional, Tuple + +import numpy as np +import pyarrow.compute as pc + +from .constants import ColumnCategory +from .tsfile_reader import TsFileReaderPy + + +class TsFileSeriesReader: + """ + TsFile Series Reader + + Wrapper around the Cython TsFileReaderPy for reading TsFile data + at the series level. Supports TAG columns: a time series is uniquely + identified by Table + Tag values + Field column, producing series + paths like "weather.beijing.humidity". + """ + + def __init__(self, file_path: str, show_progress: bool = True): + if not os.path.exists(file_path): + raise FileNotFoundError(f"TsFile not found: {file_path}") + + self.file_path = file_path + self.show_progress = show_progress + + try: + self._reader = TsFileReaderPy(file_path) + except Exception as e: + raise ValueError(f"Failed to open TsFile: {e}") + + self.series_paths: List[str] = [] + self.series_info: Dict[str, dict] = {} + self._timestamps_cache: Dict[str, np.ndarray] = {} + self._series_data_cache: Dict[str, np.ndarray] = {} + + self._cache_metadata() + + def __del__(self): + self.close() + + def close(self): + """Close the underlying Cython reader.""" + if hasattr(self, '_reader'): + try: + self._reader.close() + except Exception: + pass + + def _cache_metadata(self): + """Cache metadata from the TsFile.""" + try: + self._cache_metadata_table_model() + except Exception as e: + raise ValueError( + f"Failed to read TsFile metadata. " + f"Please ensure the TsFile is valid and readable. Error: {e}" + ) + + def _cache_metadata_table_model(self): + """ + Cache metadata using table model query via Arrow batch API. + + Unified logic for tables with or without TAG columns. + """ + table_schemas = self._reader.get_all_table_schemas() + if not table_schemas: + raise ValueError("No tables found in TsFile") + + self.series_paths = [] + table_names = list(table_schemas.keys()) + + # Progress tracking + total_rows = 0 + + for ti, table_name in enumerate(table_names): + table_schema = self._reader.get_table_schema(table_name) + + tag_columns = [] + field_columns = [] + for col_schema in table_schema.get_columns(): + col_name = col_schema.get_column_name() + col_category = col_schema.get_category() + if col_name.lower() == 'time': + continue + if col_category == ColumnCategory.TAG: + tag_columns.append(col_name) + elif col_category == ColumnCategory.FIELD: + field_columns.append(col_name) + + if not field_columns: + continue + + # Query TAG columns + first FIELD column to discover groups and timestamps + query_cols = tag_columns + [field_columns[0]] + + time_arrays = [] + tag_arrays = {tc: [] for tc in tag_columns} + + with self._reader.query_table_batch( + table_name, query_cols, batch_size=65536 + ) as rs: + while True: + arrow_table = rs.read_arrow_batch() + if arrow_table is None: + break + batch_rows = arrow_table.num_rows + total_rows += batch_rows + time_arrays.append(arrow_table.column('time').to_numpy()) + for tc in tag_columns: + tag_arrays[tc].append(arrow_table.column(tc).to_numpy()) + + if self.show_progress: + sys.stderr.write( + f"\rReading TsFile metadata: " + f"table {ti + 1}/{len(table_names)} " + f"[{table_name}] " + f"({total_rows:,} rows)" + ) + sys.stderr.flush() + + if not time_arrays: + continue + + all_times = np.concatenate(time_arrays).astype(np.int64) + + if tag_columns: + # Merge tag columns and group by unique tag combinations + all_tags = {tc: np.concatenate(tag_arrays[tc]) for tc in tag_columns} + + # Build a composite key for grouping + if len(tag_columns) == 1: + tag_key = all_tags[tag_columns[0]] + unique_keys = np.unique(tag_key) + for uk in unique_keys: + mask = tag_key == uk + tag_values = (uk,) if not isinstance(uk, tuple) else uk + self._register_tag_group( + table_name, tag_columns, tag_values, + field_columns, all_times[mask] + ) + else: + # Multiple tag columns: use structured approach + # Convert to list of tuples for grouping + n = len(all_times) + tag_tuples = [ + tuple(all_tags[tc][i] for tc in tag_columns) + for i in range(n) + ] + unique_tuples = list(dict.fromkeys(tag_tuples)) + for ut in unique_tuples: + mask = np.array([t == ut for t in tag_tuples], dtype=bool) + self._register_tag_group( + table_name, tag_columns, ut, + field_columns, all_times[mask] + ) + else: + # No TAG columns: single group + self._register_tag_group( + table_name, tag_columns, (), + field_columns, all_times + ) + + if self.show_progress and total_rows > 0: + sys.stderr.write( + f"\rReading TsFile metadata: " + f"{len(table_names)} table(s), " + f"{total_rows:,} rows, " + f"{len(self.series_paths)} series " + f"... done\n" + ) + sys.stderr.flush() + + if not self.series_paths: + raise ValueError("No valid numeric series found in TsFile") + + def _register_tag_group( + self, table_name: str, tag_columns: List[str], + tag_values: tuple, field_columns: List[str], timestamps: np.ndarray + ): + """Register all field series for a given table + tag group.""" + timestamps = np.sort(timestamps) + + if len(timestamps) == 0: + return + + if tag_columns: + tag_part = ".".join(str(v) for v in tag_values) + else: + tag_part = "" + + tag_values_dict = dict(zip(tag_columns, tag_values)) if tag_columns else {} + + for field_col in field_columns: + if tag_part: + series_path = f"{table_name}.{tag_part}.{field_col}" + else: + series_path = f"{table_name}.{field_col}" + + self.series_paths.append(series_path) + self._timestamps_cache[series_path] = timestamps + self.series_info[series_path] = { + 'length': len(timestamps), + 'min_time': int(timestamps[0]), + 'max_time': int(timestamps[-1]), + 'table_name': table_name, + 'column_name': field_col, + 'tag_columns': tag_columns, + 'tag_values': tag_values_dict, + } + + def get_all_series(self) -> List[str]: + """Return a list of all discovered series paths.""" + return self.series_paths.copy() + + def get_series_length(self, series_path: str) -> int: + """Return the number of data points for a series.""" + if series_path not in self.series_info: + raise ValueError(f"Series not found: {series_path}") + return self.series_info[series_path]['length'] + + def read_series(self, series_path: str) -> List[float]: + """Read all data points for a series. + + Args: + series_path: Time series path. + + Returns: + List of data points. + """ + if series_path not in self.series_info: + raise ValueError(f"Series not found: {series_path}") + if series_path in self._series_data_cache: + return self._series_data_cache[series_path].tolist() + length = self.series_info[series_path]['length'] + return self.read_series_range(series_path, 0, length) + + def read_series_range(self, series_path: str, start: int, end: int) -> List[float]: + """Read specified range of time series by row index. + + Args: + series_path: Time series path. + start: Start index (inclusive). + end: End index (exclusive). + + Returns: + List of data points. + """ + if series_path not in self.series_info: + raise ValueError(f"Series not found: {series_path}") + + if series_path in self._series_data_cache: + return self._series_data_cache[series_path][start:end].tolist() + + info = self.series_info[series_path] + timestamps = self._timestamps_cache[series_path] + + start_time = int(timestamps[start]) + end_time = int(timestamps[end - 1]) + + _, vals = self._read_arrow( + info['table_name'], + [info['column_name']], + info['tag_columns'], + info['tag_values'], + start_time, end_time, + ) + return vals[info['column_name']].tolist() + + def read_series_by_time_range( + self, series_path: str, start_time: int, end_time: int + ) -> Tuple[np.ndarray, np.ndarray]: + """Read data by time range directly (for loc-style queries). + + Args: + series_path: Time series path. + start_time: Start timestamp (inclusive, ms). + end_time: End timestamp (inclusive, ms). + + Returns: + Tuple of (timestamps_array, values_array). + """ + if series_path not in self.series_info: + raise ValueError(f"Series not found: {series_path}") + + info = self.series_info[series_path] + ts_arr, field_vals = self._read_arrow( + info['table_name'], + [info['column_name']], + info['tag_columns'], + info['tag_values'], + start_time, end_time, + ) + if len(ts_arr) > 0: + return ts_arr, field_vals[info['column_name']] + return np.array([], dtype=np.int64), np.array([], dtype=np.float64) + + def read_multi_series_by_time_range( + self, + table_name: str, + field_columns: List[str], + tag_columns: List[str], + tag_values: Dict[str, str], + start_time: int, + end_time: int, + ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: + """Read multiple field columns from the same table+tag group in one query. + + Args: + table_name: TsFile table name. + field_columns: List of field column names to read. + tag_columns: List of tag column names. + tag_values: Dict of tag column name to tag value. + start_time: Start timestamp (inclusive, ms). + end_time: End timestamp (inclusive, ms). + + Returns: + (timestamps_array, {field_name: values_array}). + """ + return self._read_arrow( + table_name, field_columns, tag_columns, tag_values, + start_time, end_time, + ) + + def _read_arrow( + self, + table_name: str, + field_columns: List[str], + tag_columns: List[str], + tag_values: Dict[str, str], + start_time: int, + end_time: int, + ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: + """Core Arrow batch reader. + + Read one or more field columns from a single table+tag group + via query_table_batch + read_arrow_batch. + + Args: + table_name: TsFile table name. + field_columns: Field columns to read. + tag_columns: Tag column names (for query). + tag_values: Tag filter values. + start_time: Start timestamp (inclusive, ms). + end_time: End timestamp (inclusive, ms). + + Returns: + (timestamps_array, {field_name: values_array}). + """ + if tag_columns: + query_cols = tag_columns + field_columns + else: + query_cols = list(field_columns) + + ts_list = [] + field_lists = {fc: [] for fc in field_columns} + + with self._reader.query_table_batch( + table_name, query_cols, + start_time=start_time, end_time=end_time, batch_size=65536 + ) as rs: + while True: + arrow_table = rs.read_arrow_batch() + if arrow_table is None: + break + + if tag_values: + mask = None + for tag_col, tag_val in tag_values.items(): + col_mask = pc.equal(arrow_table.column(tag_col), tag_val) + mask = col_mask if mask is None else pc.and_(mask, col_mask) + arrow_table = arrow_table.filter(mask) + + if arrow_table.num_rows > 0: + ts_list.append(arrow_table.column('time').to_numpy()) + for fc in field_columns: + field_lists[fc].append( + arrow_table.column(fc).to_numpy().astype(np.float64) + ) + + if ts_list: + return ( + np.concatenate(ts_list).astype(np.int64), + {fc: np.concatenate(field_lists[fc]) for fc in field_columns}, + ) + return ( + np.array([], dtype=np.int64), + {fc: np.array([], dtype=np.float64) for fc in field_columns}, + ) + + def cache_series_data(self, series_path: str): + """Pre-load series data into memory cache. + + Args: + series_path: Time series path. + """ + if series_path not in self.series_info: + raise ValueError(f"Series not found: {series_path}") + if series_path not in self._series_data_cache: + data = self.read_series(series_path) + self._series_data_cache[series_path] = np.array(data, dtype=np.float32) + + def is_series_cached(self, series_path: str) -> bool: + """Check if a series has its data pre-loaded in cache. + + Args: + series_path: Time series path. + + Returns: + True if the series data is cached. + """ + return series_path in self._series_data_cache