diff --git a/paimon-python/dev/requirements.txt b/paimon-python/dev/requirements.txt index 192b2e9add00..e745ef0878f8 100644 --- a/paimon-python/dev/requirements.txt +++ b/paimon-python/dev/requirements.txt @@ -16,6 +16,7 @@ # limitations under the License. ################################################################################ cachetools>=4.2,<6; python_version=="3.6" +python-dateutil>=2.8,<3 cachetools>=5,<6; python_version>"3.6" dataclasses>=0.8; python_version < "3.7" fastavro>=1.4,<2 diff --git a/paimon-python/pypaimon/cli/cli_table.py b/paimon-python/pypaimon/cli/cli_table.py index 6a7c383bb94f..a4220893bb5b 100644 --- a/paimon-python/pypaimon/cli/cli_table.py +++ b/paimon-python/pypaimon/cli/cli_table.py @@ -25,6 +25,12 @@ from pypaimon.common.json_util import JSON +def cmd_table_stream(args): + """Execute the 'table stream' command.""" + from pypaimon.cli.cli_table_stream import cmd_table_stream as _cmd + _cmd(args) + + def cmd_table_read(args): """ Execute the 'table read' command. @@ -783,6 +789,64 @@ def add_table_subcommands(table_parser): ) rename_parser.set_defaults(func=cmd_table_rename) + # table stream command + stream_parser = table_subparsers.add_parser( + 'stream', + help='Continuously stream new rows from a table until interrupted (Ctrl+C)' + ) + stream_parser.add_argument( + 'table', + help='Table identifier in format: database.table' + ) + stream_parser.add_argument( + '--select', '-s', + type=str, + default=None, + help='Select specific columns (comma-separated, e.g., "id,name")' + ) + stream_parser.add_argument( + '--where', '-w', + type=str, + default=None, + help='Filter condition in SQL-like syntax (e.g., "age > 18")' + ) + stream_parser.add_argument( + '--format', '-f', + type=str, + choices=['table', 'json'], + default='table', + help='Output format: table (default) or json' + ) + stream_parser.add_argument( + '--from', + dest='from_position', + type=str, + default='latest', + help=( + 'Starting position: "latest" (default), "earliest", a numeric snapshot ID, ' + 'or a timestamp (e.g., "2025-01-15", "2025-01-15T10:30:00Z")' + ) + ) + stream_parser.add_argument( + '--poll-interval-ms', + type=int, + default=1000, + help='Milliseconds between snapshot polls (default: 1000)' + ) + stream_parser.add_argument( + '--include-row-kind', + action='store_true', + default=False, + help='Include _row_kind column (+I, -U, +U, -D) in output' + ) + stream_parser.add_argument( + '--consumer-id', + type=str, + default=None, + help='Consumer ID for persisting scan progress across invocations' + ) + stream_parser.set_defaults(func=cmd_table_stream) + # table alter command alter_parser = table_subparsers.add_parser('alter', help='Alter a table with schema changes') alter_parser.add_argument( diff --git a/paimon-python/pypaimon/cli/cli_table_stream.py b/paimon-python/pypaimon/cli/cli_table_stream.py new file mode 100644 index 000000000000..6b6fa65192da --- /dev/null +++ b/paimon-python/pypaimon/cli/cli_table_stream.py @@ -0,0 +1,216 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +""" +Table stream command for Paimon CLI. + +Continuously polls a table for new snapshots and prints rows as they arrive. +""" + +import json +import sys +from typing import Union + +from pypaimon.snapshot.snapshot_manager import SnapshotManager + + +def _parse_timestamp(value: str) -> int: + """Parse a human-readable datetime string to epoch milliseconds. + + Accepts ISO 8601, space-separated datetimes, date-only strings, and named + timezones (e.g. "2025-01-15T10:30:00 EST", "2025-01-15T10:30:00 Europe/London"). + Naive datetimes (no timezone) are treated as local machine time. + Returns epoch ms as an integer. + Raises ValueError on unrecognised format. + """ + from dateutil import parser as dateutil_parser + from dateutil.parser import ParserError + + try: + dt = dateutil_parser.parse(value) + except (ParserError, OverflowError): + raise ValueError( + f"Unrecognised timestamp format: '{value}'. " + "Expected formats: YYYY-MM-DD, YYYY-MM-DD HH:MM:SS, " + "YYYY-MM-DDTHH:MM:SS, YYYY-MM-DDTHH:MM:SSZ, YYYY-MM-DDTHH:MM:SS+HH:MM, " + "or a datetime with a named timezone (e.g. '2025-01-15T10:30:00 EST')" + ) + + if dt.tzinfo is None: + # Treat naive datetime as local time + dt = dt.astimezone() + + return int(dt.timestamp() * 1000) + + +def parse_from_position(value: str, snapshot_manager: SnapshotManager) -> Union[str, int]: + """Resolve a --from value to a keyword or snapshot ID integer. + + Args: + value: The raw --from argument (keyword, integer string, or timestamp). + snapshot_manager: Used to resolve timestamps to snapshot IDs. + + Returns: + "latest", "earliest", or an integer snapshot ID. + + Raises: + ValueError: If the value is unrecognised or the timestamp precedes all snapshots. + """ + if value in ("latest", "earliest"): + return value + + # Pure integer → snapshot ID + if value.strip().isdigit(): + return int(value.strip()) + + # Anything containing '-', 'T', ':', or '/' is treated as a timestamp + if any(c in value for c in ("-", "T", ":", "/")): + epoch_ms = _parse_timestamp(value) + snapshot = snapshot_manager.earlier_or_equal_time_mills(epoch_ms) + if snapshot is None: + raise ValueError( + f"No snapshot found at or before '{value}'. " + "The timestamp may predate all stored snapshots." + ) + return snapshot.id + + raise ValueError( + f"Unrecognised --from value: '{value}'. " + "Use 'latest', 'earliest', a snapshot ID, or a timestamp." + ) + + +def cmd_table_stream(args): + """Execute the 'table stream' command. + + Continuously reads new rows from a Paimon table and prints them to stdout + until interrupted with Ctrl+C. + + Args: + args: Parsed command line arguments. + """ + from pypaimon.cli.cli import load_catalog_config, create_catalog + from pypaimon.table.file_store_table import FileStoreTable + + config = load_catalog_config(args.config) + catalog = create_catalog(config) + + table_identifier = args.table + parts = table_identifier.split('.') + if len(parts) != 2: + print( + f"Error: Invalid table identifier '{table_identifier}'. " + "Expected format: 'database.table'", + file=sys.stderr, + ) + sys.exit(1) + + try: + table = catalog.get_table(table_identifier) + except Exception as e: + print(f"Error: Failed to get table '{table_identifier}': {e}", file=sys.stderr) + sys.exit(1) + + available_fields = set(field.name for field in table.table_schema.fields) + + # --- Validate and build projection --- + user_columns = None + if args.select: + user_columns = [col.strip() for col in args.select.split(',')] + invalid_columns = [col for col in user_columns if col not in available_fields] + if invalid_columns: + print( + f"Error: Column(s) {invalid_columns} do not exist in table '{table_identifier}'.", + file=sys.stderr, + ) + sys.exit(1) + + # --- Parse WHERE predicate --- + predicate = None + if args.where: + from pypaimon.cli.where_parser import parse_where_clause + try: + predicate = parse_where_clause(args.where, table.table_schema.fields) + except ValueError as e: + print(f"Error: Invalid WHERE clause: {e}", file=sys.stderr) + sys.exit(1) + + # --- Resolve --from position --- + from_position = args.from_position + if from_position not in ("latest", "earliest") and not from_position.strip().isdigit(): + # Likely a timestamp — need snapshot_manager to resolve it + if not isinstance(table, FileStoreTable): + print( + "Error: Timestamp-based --from requires a FileStoreTable.", + file=sys.stderr, + ) + sys.exit(1) + snapshot_manager = table.snapshot_manager() + try: + from_position = parse_from_position(from_position, snapshot_manager) + except ValueError as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + elif from_position.strip().isdigit(): + from_position = int(from_position.strip()) + + # --- Build StreamReadBuilder --- + builder = table.new_stream_read_builder() + + if user_columns: + builder = builder.with_projection(user_columns) + if predicate: + builder = builder.with_filter(predicate) + if args.poll_interval_ms is not None: + builder = builder.with_poll_interval_ms(args.poll_interval_ms) + if args.include_row_kind: + builder = builder.with_include_row_kind(True) + if args.consumer_id: + builder = builder.with_consumer_id(args.consumer_id) + + builder = builder.with_scan_from(from_position) + + scan = builder.new_streaming_scan() + read = builder.new_read() + output_format = getattr(args, 'format', 'table') + + first_batch = True + try: + for plan in scan.stream_sync(): + splits = plan.splits() + if not splits: + continue + + df = read.to_pandas(splits) + if df.empty: + continue + + if output_format == 'json': + for record in df.to_dict(orient='records'): + print(json.dumps(record, ensure_ascii=False, default=str)) + else: + if not first_batch: + print("---") + if first_batch: + print(df.to_string(index=False)) + first_batch = False + else: + # Print rows only (no header) for subsequent batches + print(df.to_string(index=False, header=False)) + + except KeyboardInterrupt: + sys.exit(0) diff --git a/paimon-python/pypaimon/read/stream_read_builder.py b/paimon-python/pypaimon/read/stream_read_builder.py index 42cc787e04c7..adbcb7d659be 100644 --- a/paimon-python/pypaimon/read/stream_read_builder.py +++ b/paimon-python/pypaimon/read/stream_read_builder.py @@ -22,7 +22,7 @@ tables, similar to ReadBuilder but for continuous streaming use cases. """ -from typing import Callable, List, Optional, Set +from typing import Callable, List, Optional, Set, Union from pypaimon.common.predicate import Predicate from pypaimon.common.predicate_builder import PredicateBuilder @@ -59,6 +59,7 @@ def __init__(self, table): self._include_row_kind: bool = False self._bucket_filter: Optional[Callable[[int], bool]] = None self._consumer_id: Optional[str] = None + self._scan_from: Optional[Union[str, int]] = None def with_filter(self, predicate: Predicate) -> 'StreamReadBuilder': """Set a filter predicate for the streaming read.""" @@ -90,6 +91,17 @@ def with_consumer_id(self, consumer_id: str) -> 'StreamReadBuilder': self._consumer_id = consumer_id return self + def with_scan_from(self, position: Union[str, int]) -> 'StreamReadBuilder': + """Set the starting position for the streaming scan. + + Args: + position: One of "latest" (default), "earliest", or a positive + integer snapshot ID. Timestamps must be resolved to a snapshot + ID by the caller before passing here. + """ + self._scan_from = position + return self + def with_bucket_filter( self, bucket_filter: Callable[[int], bool] @@ -120,7 +132,8 @@ def new_streaming_scan(self) -> AsyncStreamingTableScan: predicate=self._predicate, poll_interval_ms=self._poll_interval_ms, bucket_filter=self._bucket_filter, - consumer_id=self._consumer_id + consumer_id=self._consumer_id, + scan_from=self._scan_from ) def new_read(self) -> TableRead: diff --git a/paimon-python/pypaimon/read/streaming_table_scan.py b/paimon-python/pypaimon/read/streaming_table_scan.py index 835c61dd73aa..7b27a2b085e3 100644 --- a/paimon-python/pypaimon/read/streaming_table_scan.py +++ b/paimon-python/pypaimon/read/streaming_table_scan.py @@ -80,7 +80,8 @@ def __init__( bucket_filter: Optional[Callable[[int], bool]] = None, prefetch_enabled: bool = True, diff_threshold: int = 10, - consumer_id: Optional[str] = None + consumer_id: Optional[str] = None, + scan_from=None ): """Initialize the streaming table scan.""" self.table = table @@ -118,6 +119,9 @@ def __init__( # Auto-select based on changelog-producer if not explicitly provided self.follow_up_scanner = follow_up_scanner or self._create_follow_up_scanner() + # Starting position (set via StreamReadBuilder.with_scan_from) + self._scan_from = scan_from + # State tracking self.next_snapshot_id: Optional[int] = None self._pending_consumer_snapshot: Optional[int] = None @@ -131,13 +135,28 @@ async def stream(self) -> AsyncGenerator[Plan, None]: Yields: Plan objects containing splits for reading """ - # Restore from consumer if available + # Restore from consumer if available (highest priority — overrides scan_from) if self.next_snapshot_id is None and self._consumer_manager: consumer = self._consumer_manager.consumer(self._consumer_id) if consumer: self.next_snapshot_id = consumer.next_snapshot - # Initial scan + # Resolve scan_from if no position has been set yet (no consumer restore) + if self.next_snapshot_id is None and self._scan_from is not None: + scan_from = self._scan_from + if scan_from == "earliest": + earliest_snapshot = self._snapshot_manager.try_get_earliest_snapshot() + if earliest_snapshot is not None: + self.next_snapshot_id = earliest_snapshot.id + 1 + self._stage_consumer() + yield self._create_initial_plan(earliest_snapshot) + self._flush_pending_consumer() + # If no snapshot exists yet, fall through to the polling loop + elif scan_from != "latest": + # Numeric snapshot ID — set directly; diff catch-up handles large gaps + self.next_snapshot_id = int(scan_from) + + # Initial scan (default "latest" path, also taken when scan_from is None or "latest") if self.next_snapshot_id is None: latest_snapshot = self._snapshot_manager.get_latest_snapshot() if latest_snapshot: diff --git a/paimon-python/pypaimon/tests/cli_table_stream_test.py b/paimon-python/pypaimon/tests/cli_table_stream_test.py new file mode 100644 index 000000000000..81279e1daf0d --- /dev/null +++ b/paimon-python/pypaimon/tests/cli_table_stream_test.py @@ -0,0 +1,361 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +"""Tests for the 'table stream' CLI command.""" + +import json +import os +import shutil +import sys +import tempfile +import unittest +from io import StringIO +from unittest.mock import MagicMock, patch, call + +import pyarrow as pa +import pandas as pd + +from pypaimon import CatalogFactory, Schema +from pypaimon.cli.cli import main +from pypaimon.cli.cli_table_stream import parse_from_position + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_plan(data): + """Create a mock Plan that yields a real DataFrame when read.""" + from pypaimon.read.plan import Plan + plan = MagicMock(spec=Plan) + plan.splits.return_value = ["dummy_split"] + plan._data = data + return plan + + +def _make_mock_read(df): + """Return a mock TableRead whose to_pandas() returns df.""" + mock_read = MagicMock() + mock_read.to_pandas.return_value = df + return mock_read + + +# --------------------------------------------------------------------------- +# Fixture helpers +# --------------------------------------------------------------------------- + +def _write_snapshot(table, data_dict, schema): + """Write one snapshot of data to table.""" + write_builder = table.new_batch_write_builder() + table_write = write_builder.new_write() + table_commit = write_builder.new_commit() + arrow_table = pa.Table.from_pydict(data_dict, schema=schema) + table_write.write_arrow(arrow_table) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + +# --------------------------------------------------------------------------- +# CLI integration tests +# --------------------------------------------------------------------------- + +class CliTableStreamTest(unittest.TestCase): + """CLI tests for 'table stream' using mocked stream_sync.""" + + @classmethod + def setUpClass(cls): + cls.tempdir = tempfile.mkdtemp() + cls.warehouse = os.path.join(cls.tempdir, 'warehouse') + cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse}) + cls.catalog.create_database('test_db', True) + + pa_schema = pa.schema([ + ('id', pa.int32()), + ('name', pa.string()), + ('age', pa.int32()), + ]) + schema = Schema.from_pyarrow_schema(pa_schema) + cls.catalog.create_table('test_db.stream_users', schema, False) + + table = cls.catalog.get_table('test_db.stream_users') + _write_snapshot(table, {'id': [1, 2], 'name': ['Alice', 'Bob'], 'age': [25, 30]}, pa_schema) + _write_snapshot(table, {'id': [3], 'name': ['Charlie'], 'age': [35]}, pa_schema) + + cls.config_file = os.path.join(cls.tempdir, 'paimon.yaml') + with open(cls.config_file, 'w') as f: + f.write(f"metastore: filesystem\nwarehouse: {cls.warehouse}\n") + + cls.pa_schema = pa_schema + + @classmethod + def tearDownClass(cls): + shutil.rmtree(cls.tempdir, ignore_errors=True) + + def _run_stream(self, extra_args, plans_dfs, capture_stderr=False): + """Run 'paimon table stream' with stream_sync mocked to yield given DataFrames.""" + base_args = ['paimon', '-c', self.config_file, 'table', 'stream', 'test_db.stream_users'] + argv = base_args + extra_args + + # Build mock plans + mock_plans = [] + for df in plans_dfs: + plan = MagicMock() + plan.splits.return_value = [] if df is None else ["split"] + mock_plans.append(plan) + + # stream_sync just yields the plans once, then stops + def fake_stream_sync(): + return iter(mock_plans) + + mock_scan = MagicMock() + mock_scan.stream_sync = fake_stream_sync + + # to_pandas returns the corresponding df for each call + call_count = [0] + dfs_iter = [df for df in plans_dfs if df is not None] + + def fake_to_pandas(splits): + if not splits: + return pd.DataFrame() + idx = call_count[0] + call_count[0] += 1 + return dfs_iter[idx] if idx < len(dfs_iter) else pd.DataFrame() + + mock_read = MagicMock() + mock_read.to_pandas.side_effect = fake_to_pandas + + mock_builder = MagicMock() + mock_builder.new_streaming_scan.return_value = mock_scan + mock_builder.new_read.return_value = mock_read + mock_builder.with_projection.return_value = mock_builder + mock_builder.with_filter.return_value = mock_builder + mock_builder.with_poll_interval_ms.return_value = mock_builder + mock_builder.with_include_row_kind.return_value = mock_builder + mock_builder.with_consumer_id.return_value = mock_builder + mock_builder.with_scan_from.return_value = mock_builder + + stdout_buf = StringIO() + stderr_buf = StringIO() + + with patch('sys.argv', argv): + with patch('sys.stdout', new_callable=StringIO) as mock_stdout: + with patch('sys.stderr', new_callable=StringIO) as mock_stderr: + with patch.object( + __import__('pypaimon.table.file_store_table', + fromlist=['FileStoreTable']).FileStoreTable, + 'new_stream_read_builder', + return_value=mock_builder + ): + try: + main() + except SystemExit as e: + exit_code = e.code + else: + exit_code = 0 + stdout_out = mock_stdout.getvalue() + stderr_out = mock_stderr.getvalue() + + return stdout_out, stderr_out, exit_code, mock_builder + + def test_basic_streaming_output(self): + """Rows from mock plan appear in stdout.""" + df = pd.DataFrame({'id': [3], 'name': ['Charlie'], 'age': [35]}) + stdout, _, exit_code, _ = self._run_stream([], [df]) + self.assertIn('Charlie', stdout) + self.assertEqual(exit_code, 0) + + def test_select_projection_passed_to_builder(self): + """--select passes projection to StreamReadBuilder.with_projection().""" + df = pd.DataFrame({'id': [1], 'name': ['Alice']}) + _, _, _, builder = self._run_stream(['--select', 'id,name'], [df]) + builder.with_projection.assert_called_once_with(['id', 'name']) + + def test_where_filter_passed_to_builder(self): + """--where passes a predicate to StreamReadBuilder.with_filter().""" + df = pd.DataFrame({'id': [1], 'name': ['Alice'], 'age': [25]}) + _, _, _, builder = self._run_stream(['--where', 'age > 20'], [df]) + builder.with_filter.assert_called_once() + + def test_format_json_outputs_json_objects(self): + """--format json outputs one JSON object per row.""" + df = pd.DataFrame({'id': [1, 2], 'name': ['Alice', 'Bob'], 'age': [25, 30]}) + stdout, _, exit_code, _ = self._run_stream(['--format', 'json'], [df]) + lines = [l for l in stdout.strip().splitlines() if l] + self.assertEqual(len(lines), 2) + for line in lines: + obj = json.loads(line) + self.assertIn('id', obj) + + def test_include_row_kind_passed_to_builder(self): + """--include-row-kind calls builder.with_include_row_kind(True).""" + df = pd.DataFrame({'_row_kind': ['+I'], 'id': [1], 'name': ['Alice'], 'age': [25]}) + _, _, _, builder = self._run_stream(['--include-row-kind'], [df]) + builder.with_include_row_kind.assert_called_once_with(True) + + def test_consumer_id_passed_to_builder(self): + """--consumer-id passes value to builder.with_consumer_id().""" + df = pd.DataFrame({'id': [1], 'name': ['Alice'], 'age': [25]}) + _, _, _, builder = self._run_stream(['--consumer-id', 'my-consumer'], [df]) + builder.with_consumer_id.assert_called_once_with('my-consumer') + + def test_from_earliest_passed_to_builder(self): + """--from earliest passes 'earliest' to builder.with_scan_from().""" + df = pd.DataFrame({'id': [1], 'name': ['Alice'], 'age': [25]}) + _, _, _, builder = self._run_stream(['--from', 'earliest'], [df]) + builder.with_scan_from.assert_called_once_with('earliest') + + def test_from_snapshot_id_passed_to_builder(self): + """--from 42 passes integer 42 to builder.with_scan_from().""" + df = pd.DataFrame({'id': [1], 'name': ['Alice'], 'age': [25]}) + _, _, _, builder = self._run_stream(['--from', '42'], [df]) + builder.with_scan_from.assert_called_once_with(42) + + def test_empty_plan_produces_no_output(self): + """A plan with no splits produces no stdout output.""" + stdout, _, _, _ = self._run_stream([], [None]) + self.assertEqual(stdout.strip(), '') + + def test_invalid_select_column_exits_nonzero(self): + """--select with nonexistent column prints error and exits non-zero.""" + with patch('sys.argv', ['paimon', '-c', self.config_file, 'table', 'stream', + 'test_db.stream_users', '--select', 'id,bogus']): + with patch('sys.stderr', new_callable=StringIO) as mock_stderr: + try: + main() + exit_code = 0 + except SystemExit as e: + exit_code = e.code + self.assertNotEqual(exit_code, 0) + self.assertIn('bogus', mock_stderr.getvalue()) + + def test_invalid_where_clause_exits_nonzero(self): + """--where with invalid syntax prints error and exits non-zero.""" + with patch('sys.argv', ['paimon', '-c', self.config_file, 'table', 'stream', + 'test_db.stream_users', '--where', '%%invalid%%']): + with patch('sys.stderr', new_callable=StringIO) as mock_stderr: + try: + main() + exit_code = 0 + except SystemExit as e: + exit_code = e.code + self.assertNotEqual(exit_code, 0) + self.assertIn('WHERE', mock_stderr.getvalue()) + + def test_keyboard_interrupt_exits_cleanly(self): + """KeyboardInterrupt exits with code 0 and no traceback.""" + mock_scan = MagicMock() + mock_scan.stream_sync.side_effect = KeyboardInterrupt + + mock_builder = MagicMock() + mock_builder.new_streaming_scan.return_value = mock_scan + mock_builder.new_read.return_value = MagicMock() + mock_builder.with_projection.return_value = mock_builder + mock_builder.with_filter.return_value = mock_builder + mock_builder.with_poll_interval_ms.return_value = mock_builder + mock_builder.with_include_row_kind.return_value = mock_builder + mock_builder.with_consumer_id.return_value = mock_builder + mock_builder.with_scan_from.return_value = mock_builder + + with patch('sys.argv', ['paimon', '-c', self.config_file, 'table', 'stream', + 'test_db.stream_users']): + with patch('sys.stderr', new_callable=StringIO) as mock_stderr: + with patch.object( + __import__('pypaimon.table.file_store_table', + fromlist=['FileStoreTable']).FileStoreTable, + 'new_stream_read_builder', + return_value=mock_builder + ): + try: + main() + exit_code = 0 + except SystemExit as e: + exit_code = e.code + stderr_out = mock_stderr.getvalue() + + self.assertEqual(exit_code, 0) + self.assertNotIn('Traceback', stderr_out) + + +# --------------------------------------------------------------------------- +# Unit tests for parse_from_position +# --------------------------------------------------------------------------- + +class ParseFromPositionTest(unittest.TestCase): + """Unit tests for parse_from_position().""" + + def _mock_snapshot_manager(self, snapshot_id=None): + mgr = MagicMock() + if snapshot_id is not None: + snap = MagicMock() + snap.id = snapshot_id + mgr.earlier_or_equal_time_mills.return_value = snap + else: + mgr.earlier_or_equal_time_mills.return_value = None + return mgr + + def test_latest_passthrough(self): + self.assertEqual(parse_from_position("latest", MagicMock()), "latest") + + def test_earliest_passthrough(self): + self.assertEqual(parse_from_position("earliest", MagicMock()), "earliest") + + def test_integer_string(self): + self.assertEqual(parse_from_position("42", MagicMock()), 42) + + def test_date_string_resolves_to_snapshot_id(self): + mgr = self._mock_snapshot_manager(snapshot_id=7) + result = parse_from_position("2025-01-15", mgr) + self.assertEqual(result, 7) + mgr.earlier_or_equal_time_mills.assert_called_once() + + def test_utc_timestamp_resolves_to_snapshot_id(self): + mgr = self._mock_snapshot_manager(snapshot_id=12) + result = parse_from_position("2025-01-15T10:30:00Z", mgr) + self.assertEqual(result, 12) + # Verify the epoch ms passed is consistent with the UTC timestamp + epoch_ms_arg = mgr.earlier_or_equal_time_mills.call_args[0][0] + # 2025-01-15T10:30:00Z = 1736937000000 ms (approx) + self.assertAlmostEqual(epoch_ms_arg, 1736937000000, delta=60000) + + def test_timestamp_before_all_snapshots_raises(self): + mgr = self._mock_snapshot_manager(snapshot_id=None) + with self.assertRaises(ValueError) as ctx: + parse_from_position("2020-01-01", mgr) + self.assertIn("No snapshot found", str(ctx.exception)) + + def test_unrecognised_value_raises(self): + with self.assertRaises(ValueError) as ctx: + parse_from_position("not_a_thing", MagicMock()) + self.assertIn("Unrecognised", str(ctx.exception)) + + def test_from_timestamp_cli_error_message(self): + """--from with a timestamp that has no prior snapshot prints error to stderr.""" + with patch('sys.argv', [ + 'paimon', '-c', '/nonexistent/paimon.yaml', + 'table', 'stream', 'test_db.stream_users', + '--from', '1900-01-01' + ]): + # This will fail at catalog load before reaching timestamp resolution, + # so we just verify parse_from_position raises correctly in isolation. + mgr = self._mock_snapshot_manager(snapshot_id=None) + with self.assertRaises(ValueError): + parse_from_position("1900-01-01", mgr) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/stream_read_builder_test.py b/paimon-python/pypaimon/tests/stream_read_builder_test.py index 328a8920c490..bb632b0bb467 100644 --- a/paimon-python/pypaimon/tests/stream_read_builder_test.py +++ b/paimon-python/pypaimon/tests/stream_read_builder_test.py @@ -122,6 +122,55 @@ def test_filter_no_filter_returns_all(self, mock_scan_table): assert [e.bucket for e in filtered] == list(range(8)) +class TestWithScanFrom: + """Unit tests for StreamReadBuilder.with_scan_from().""" + + def test_with_scan_from_earliest(self, builder): + """with_scan_from('earliest') stores value and returns self.""" + result = builder.with_scan_from("earliest") + assert result is builder + assert builder._scan_from == "earliest" + + def test_with_scan_from_latest(self, builder): + """with_scan_from('latest') stores value and returns self.""" + result = builder.with_scan_from("latest") + assert result is builder + assert builder._scan_from == "latest" + + def test_with_scan_from_integer(self, builder): + """with_scan_from(42) stores integer value and returns self.""" + result = builder.with_scan_from(42) + assert result is builder + assert builder._scan_from == 42 + + def test_with_scan_from_passes_to_scan(self, mock_scan_table): + """with_scan_from() value is passed through to AsyncStreamingTableScan.""" + builder = StreamReadBuilder(mock_scan_table) + builder.with_scan_from("earliest") + scan = builder.new_streaming_scan() + assert scan._scan_from == "earliest" + + def test_with_scan_from_integer_passes_to_scan(self, mock_scan_table): + """with_scan_from(42) passes integer to AsyncStreamingTableScan.""" + builder = StreamReadBuilder(mock_scan_table) + builder.with_scan_from(42) + scan = builder.new_streaming_scan() + assert scan._scan_from == 42 + + def test_with_scan_from_none_by_default(self, mock_scan_table): + """Without calling with_scan_from(), _scan_from is None.""" + builder = StreamReadBuilder(mock_scan_table) + scan = builder.new_streaming_scan() + assert scan._scan_from is None + + def test_method_chaining_with_scan_from(self, builder): + """with_scan_from() chains correctly with other builder methods.""" + result = builder.with_scan_from("earliest").with_poll_interval_ms(500) + assert result is builder + assert builder._scan_from == "earliest" + assert builder._poll_interval_ms == 500 + + class TestConsumerIdPassthrough: """Test that consumer_id passes through to AsyncStreamingTableScan.""" diff --git a/paimon-python/pypaimon/tests/streaming_table_scan_test.py b/paimon-python/pypaimon/tests/streaming_table_scan_test.py index a0b456027ed0..74778be71340 100644 --- a/paimon-python/pypaimon/tests/streaming_table_scan_test.py +++ b/paimon-python/pypaimon/tests/streaming_table_scan_test.py @@ -613,5 +613,136 @@ def test_no_consumer_when_consumer_id_not_set( self.assertIsNone(scan._consumer_manager) +class ScanFromTest(unittest.TestCase): + """Integration tests for AsyncStreamingTableScan scan_from parameter.""" + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') + @patch('pypaimon.read.streaming_table_scan.FileScanner') + def test_scan_from_earliest( + self, MockFileScanner, MockManifestFileManager, + MockManifestListManager, MockSnapshotManager + ): + """scan_from='earliest' should yield initial plan from the earliest snapshot.""" + table, _ = _create_mock_table() + + earliest = _create_mock_snapshot(1) + mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager.try_get_earliest_snapshot.return_value = earliest + mock_snapshot_manager.find_next_scannable.return_value = (None, 2, 0) + + MockFileScanner.return_value.scan.return_value = Plan([]) + + scan = AsyncStreamingTableScan(table, scan_from="earliest", prefetch_enabled=False) + + async def get_first(): + async for plan in scan.stream(): + return plan + + plan = asyncio.run(get_first()) + self.assertIsInstance(plan, Plan) + # next_snapshot_id should be earliest.id + 1 + self.assertEqual(scan.next_snapshot_id, 2) + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') + @patch('pypaimon.read.streaming_table_scan.FileScanner') + def test_scan_from_numeric_id( + self, MockFileScanner, MockManifestFileManager, + MockManifestListManager, MockSnapshotManager + ): + """scan_from=5 should set next_snapshot_id=5 without an initial full scan.""" + table, _ = _create_mock_table() + + mock_snapshot_manager = MockSnapshotManager.return_value + # get_latest_snapshot needed by _should_use_diff_catch_up + mock_snapshot_manager.get_latest_snapshot.return_value = _create_mock_snapshot(5) + mock_snapshot_manager.find_next_scannable.return_value = (None, 6, 0) + + scan = AsyncStreamingTableScan(table, scan_from=5, prefetch_enabled=False) + + async def check_state(): + gen = scan.stream() + try: + await asyncio.wait_for(gen.__anext__(), timeout=0.05) + except (asyncio.TimeoutError, StopAsyncIteration): + pass + + asyncio.run(check_state()) + # Numeric scan_from bypasses both earliest and latest initial-scan paths; + # try_get_earliest_snapshot must NOT have been called (that's the earliest path). + mock_snapshot_manager.try_get_earliest_snapshot.assert_not_called() + # FileScanner.scan was NOT called for an initial full-scan plan + MockFileScanner.return_value.scan.assert_not_called() + + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') + @patch('pypaimon.read.streaming_table_scan.FileScanner') + def test_scan_from_latest_matches_default( + self, MockFileScanner, MockManifestFileManager, + MockManifestListManager, MockSnapshotManager + ): + """scan_from='latest' should behave identically to the default (no scan_from).""" + table, _ = _create_mock_table() + + latest = _create_mock_snapshot(7) + mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager.get_latest_snapshot.return_value = latest + mock_snapshot_manager.get_snapshot_by_id.return_value = None + + MockFileScanner.return_value.scan.return_value = Plan([]) + + scan = AsyncStreamingTableScan(table, scan_from="latest", prefetch_enabled=False) + + async def get_first(): + async for plan in scan.stream(): + return plan + + asyncio.run(get_first()) + self.assertEqual(scan.next_snapshot_id, 8) + + @patch('pypaimon.read.streaming_table_scan.ConsumerManager') + @patch('pypaimon.read.streaming_table_scan.SnapshotManager') + @patch('pypaimon.read.streaming_table_scan.ManifestListManager') + @patch('pypaimon.read.streaming_table_scan.ManifestFileManager') + @patch('pypaimon.read.streaming_table_scan.FileScanner') + def test_consumer_restore_overrides_scan_from( + self, MockFileScanner, MockManifestFileManager, MockManifestListManager, + MockSnapshotManager, MockConsumerManager + ): + """Consumer restore should take precedence over scan_from='earliest'.""" + table, _ = _create_mock_table() + + mock_consumer_manager = MockConsumerManager.return_value + mock_consumer = Mock() + mock_consumer.next_snapshot = 10 + mock_consumer_manager.consumer.return_value = mock_consumer + + mock_snapshot_manager = MockSnapshotManager.return_value + mock_snapshot_manager.get_latest_snapshot.return_value = _create_mock_snapshot(10) + mock_snapshot_manager.find_next_scannable.return_value = (None, 11, 0) + + scan = AsyncStreamingTableScan( + table, scan_from="earliest", consumer_id="my-consumer", + prefetch_enabled=False + ) + + async def check_state(): + gen = scan.stream() + try: + await asyncio.wait_for(gen.__anext__(), timeout=0.05) + except (asyncio.TimeoutError, StopAsyncIteration): + pass + + asyncio.run(check_state()) + # Consumer restored position; try_get_earliest_snapshot must NOT have been called + mock_snapshot_manager.try_get_earliest_snapshot.assert_not_called() + # Consumer manager's consumer() was called to restore position + mock_consumer_manager.consumer.assert_called_once_with("my-consumer") + + if __name__ == '__main__': unittest.main()