Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions paimon-python/dev/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# limitations under the License.
################################################################################
cachetools>=4.2,<6; python_version=="3.6"
python-dateutil>=2.8,<3
cachetools>=5,<6; python_version>"3.6"
dataclasses>=0.8; python_version < "3.7"
fastavro>=1.4,<2
Expand Down
64 changes: 64 additions & 0 deletions paimon-python/pypaimon/cli/cli_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
from pypaimon.common.json_util import JSON


def cmd_table_stream(args):
"""Execute the 'table stream' command."""
from pypaimon.cli.cli_table_stream import cmd_table_stream as _cmd
_cmd(args)


def cmd_table_read(args):
"""
Execute the 'table read' command.
Expand Down Expand Up @@ -783,6 +789,64 @@ def add_table_subcommands(table_parser):
)
rename_parser.set_defaults(func=cmd_table_rename)

# table stream command
stream_parser = table_subparsers.add_parser(
'stream',
help='Continuously stream new rows from a table until interrupted (Ctrl+C)'
)
stream_parser.add_argument(
'table',
help='Table identifier in format: database.table'
)
stream_parser.add_argument(
'--select', '-s',
type=str,
default=None,
help='Select specific columns (comma-separated, e.g., "id,name")'
)
stream_parser.add_argument(
'--where', '-w',
type=str,
default=None,
help='Filter condition in SQL-like syntax (e.g., "age > 18")'
)
stream_parser.add_argument(
'--format', '-f',
type=str,
choices=['table', 'json'],
default='table',
help='Output format: table (default) or json'
)
stream_parser.add_argument(
'--from',
dest='from_position',
type=str,
default='latest',
help=(
'Starting position: "latest" (default), "earliest", a numeric snapshot ID, '
'or a timestamp (e.g., "2025-01-15", "2025-01-15T10:30:00Z")'
)
)
stream_parser.add_argument(
'--poll-interval-ms',
type=int,
default=1000,
help='Milliseconds between snapshot polls (default: 1000)'
)
stream_parser.add_argument(
'--include-row-kind',
action='store_true',
default=False,
help='Include _row_kind column (+I, -U, +U, -D) in output'
)
stream_parser.add_argument(
'--consumer-id',
type=str,
default=None,
help='Consumer ID for persisting scan progress across invocations'
)
stream_parser.set_defaults(func=cmd_table_stream)

# table alter command
alter_parser = table_subparsers.add_parser('alter', help='Alter a table with schema changes')
alter_parser.add_argument(
Expand Down
216 changes: 216 additions & 0 deletions paimon-python/pypaimon/cli/cli_table_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
"""
Table stream command for Paimon CLI.

Continuously polls a table for new snapshots and prints rows as they arrive.
"""

import json
import sys
from typing import Union

from pypaimon.snapshot.snapshot_manager import SnapshotManager


def _parse_timestamp(value: str) -> int:
"""Parse a human-readable datetime string to epoch milliseconds.

Accepts ISO 8601, space-separated datetimes, date-only strings, and named
timezones (e.g. "2025-01-15T10:30:00 EST", "2025-01-15T10:30:00 Europe/London").
Naive datetimes (no timezone) are treated as local machine time.
Returns epoch ms as an integer.
Raises ValueError on unrecognised format.
"""
from dateutil import parser as dateutil_parser
from dateutil.parser import ParserError

try:
dt = dateutil_parser.parse(value)
except (ParserError, OverflowError):
raise ValueError(
f"Unrecognised timestamp format: '{value}'. "
"Expected formats: YYYY-MM-DD, YYYY-MM-DD HH:MM:SS, "
"YYYY-MM-DDTHH:MM:SS, YYYY-MM-DDTHH:MM:SSZ, YYYY-MM-DDTHH:MM:SS+HH:MM, "
"or a datetime with a named timezone (e.g. '2025-01-15T10:30:00 EST')"
)

if dt.tzinfo is None:
# Treat naive datetime as local time
dt = dt.astimezone()

return int(dt.timestamp() * 1000)


def parse_from_position(value: str, snapshot_manager: SnapshotManager) -> Union[str, int]:
"""Resolve a --from value to a keyword or snapshot ID integer.

Args:
value: The raw --from argument (keyword, integer string, or timestamp).
snapshot_manager: Used to resolve timestamps to snapshot IDs.

Returns:
"latest", "earliest", or an integer snapshot ID.

Raises:
ValueError: If the value is unrecognised or the timestamp precedes all snapshots.
"""
if value in ("latest", "earliest"):
return value

# Pure integer → snapshot ID
if value.strip().isdigit():
return int(value.strip())

# Anything containing '-', 'T', ':', or '/' is treated as a timestamp
if any(c in value for c in ("-", "T", ":", "/")):
epoch_ms = _parse_timestamp(value)
snapshot = snapshot_manager.earlier_or_equal_time_mills(epoch_ms)
if snapshot is None:
raise ValueError(
f"No snapshot found at or before '{value}'. "
"The timestamp may predate all stored snapshots."
)
return snapshot.id

raise ValueError(
f"Unrecognised --from value: '{value}'. "
"Use 'latest', 'earliest', a snapshot ID, or a timestamp."
)


def cmd_table_stream(args):
"""Execute the 'table stream' command.

Continuously reads new rows from a Paimon table and prints them to stdout
until interrupted with Ctrl+C.

Args:
args: Parsed command line arguments.
"""
from pypaimon.cli.cli import load_catalog_config, create_catalog
from pypaimon.table.file_store_table import FileStoreTable

config = load_catalog_config(args.config)
catalog = create_catalog(config)

table_identifier = args.table
parts = table_identifier.split('.')
if len(parts) != 2:
print(
f"Error: Invalid table identifier '{table_identifier}'. "
"Expected format: 'database.table'",
file=sys.stderr,
)
sys.exit(1)

try:
table = catalog.get_table(table_identifier)
except Exception as e:
print(f"Error: Failed to get table '{table_identifier}': {e}", file=sys.stderr)
sys.exit(1)

available_fields = set(field.name for field in table.table_schema.fields)

# --- Validate and build projection ---
user_columns = None
if args.select:
user_columns = [col.strip() for col in args.select.split(',')]
invalid_columns = [col for col in user_columns if col not in available_fields]
if invalid_columns:
print(
f"Error: Column(s) {invalid_columns} do not exist in table '{table_identifier}'.",
file=sys.stderr,
)
sys.exit(1)

# --- Parse WHERE predicate ---
predicate = None
if args.where:
from pypaimon.cli.where_parser import parse_where_clause
try:
predicate = parse_where_clause(args.where, table.table_schema.fields)
except ValueError as e:
print(f"Error: Invalid WHERE clause: {e}", file=sys.stderr)
sys.exit(1)

# --- Resolve --from position ---
from_position = args.from_position
if from_position not in ("latest", "earliest") and not from_position.strip().isdigit():
# Likely a timestamp — need snapshot_manager to resolve it
if not isinstance(table, FileStoreTable):
print(
"Error: Timestamp-based --from requires a FileStoreTable.",
file=sys.stderr,
)
sys.exit(1)
snapshot_manager = table.snapshot_manager()
try:
from_position = parse_from_position(from_position, snapshot_manager)
except ValueError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
elif from_position.strip().isdigit():
from_position = int(from_position.strip())

# --- Build StreamReadBuilder ---
builder = table.new_stream_read_builder()

if user_columns:
builder = builder.with_projection(user_columns)
if predicate:
builder = builder.with_filter(predicate)
if args.poll_interval_ms is not None:
builder = builder.with_poll_interval_ms(args.poll_interval_ms)
if args.include_row_kind:
builder = builder.with_include_row_kind(True)
if args.consumer_id:
builder = builder.with_consumer_id(args.consumer_id)

builder = builder.with_scan_from(from_position)

scan = builder.new_streaming_scan()
read = builder.new_read()
output_format = getattr(args, 'format', 'table')

first_batch = True
try:
for plan in scan.stream_sync():
splits = plan.splits()
if not splits:
continue

df = read.to_pandas(splits)
if df.empty:
continue

if output_format == 'json':
for record in df.to_dict(orient='records'):
print(json.dumps(record, ensure_ascii=False, default=str))
else:
if not first_batch:
print("---")
if first_batch:
print(df.to_string(index=False))
first_batch = False
else:
# Print rows only (no header) for subsequent batches
print(df.to_string(index=False, header=False))

except KeyboardInterrupt:
sys.exit(0)
17 changes: 15 additions & 2 deletions paimon-python/pypaimon/read/stream_read_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
tables, similar to ReadBuilder but for continuous streaming use cases.
"""

from typing import Callable, List, Optional, Set
from typing import Callable, List, Optional, Set, Union

from pypaimon.common.predicate import Predicate
from pypaimon.common.predicate_builder import PredicateBuilder
Expand Down Expand Up @@ -59,6 +59,7 @@ def __init__(self, table):
self._include_row_kind: bool = False
self._bucket_filter: Optional[Callable[[int], bool]] = None
self._consumer_id: Optional[str] = None
self._scan_from: Optional[Union[str, int]] = None

def with_filter(self, predicate: Predicate) -> 'StreamReadBuilder':
"""Set a filter predicate for the streaming read."""
Expand Down Expand Up @@ -90,6 +91,17 @@ def with_consumer_id(self, consumer_id: str) -> 'StreamReadBuilder':
self._consumer_id = consumer_id
return self

def with_scan_from(self, position: Union[str, int]) -> 'StreamReadBuilder':
"""Set the starting position for the streaming scan.

Args:
position: One of "latest" (default), "earliest", or a positive
integer snapshot ID. Timestamps must be resolved to a snapshot
ID by the caller before passing here.
"""
self._scan_from = position
return self

def with_bucket_filter(
self,
bucket_filter: Callable[[int], bool]
Expand Down Expand Up @@ -120,7 +132,8 @@ def new_streaming_scan(self) -> AsyncStreamingTableScan:
predicate=self._predicate,
poll_interval_ms=self._poll_interval_ms,
bucket_filter=self._bucket_filter,
consumer_id=self._consumer_id
consumer_id=self._consumer_id,
scan_from=self._scan_from
)

def new_read(self) -> TableRead:
Expand Down
25 changes: 22 additions & 3 deletions paimon-python/pypaimon/read/streaming_table_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def __init__(
bucket_filter: Optional[Callable[[int], bool]] = None,
prefetch_enabled: bool = True,
diff_threshold: int = 10,
consumer_id: Optional[str] = None
consumer_id: Optional[str] = None,
scan_from=None
):
"""Initialize the streaming table scan."""
self.table = table
Expand Down Expand Up @@ -118,6 +119,9 @@ def __init__(
# Auto-select based on changelog-producer if not explicitly provided
self.follow_up_scanner = follow_up_scanner or self._create_follow_up_scanner()

# Starting position (set via StreamReadBuilder.with_scan_from)
self._scan_from = scan_from

# State tracking
self.next_snapshot_id: Optional[int] = None
self._pending_consumer_snapshot: Optional[int] = None
Expand All @@ -131,13 +135,28 @@ async def stream(self) -> AsyncGenerator[Plan, None]:
Yields:
Plan objects containing splits for reading
"""
# Restore from consumer if available
# Restore from consumer if available (highest priority — overrides scan_from)
if self.next_snapshot_id is None and self._consumer_manager:
consumer = self._consumer_manager.consumer(self._consumer_id)
if consumer:
self.next_snapshot_id = consumer.next_snapshot

# Initial scan
# Resolve scan_from if no position has been set yet (no consumer restore)
if self.next_snapshot_id is None and self._scan_from is not None:
scan_from = self._scan_from
if scan_from == "earliest":
earliest_snapshot = self._snapshot_manager.try_get_earliest_snapshot()
if earliest_snapshot is not None:
self.next_snapshot_id = earliest_snapshot.id + 1
self._stage_consumer()
yield self._create_initial_plan(earliest_snapshot)
self._flush_pending_consumer()
# If no snapshot exists yet, fall through to the polling loop
elif scan_from != "latest":
# Numeric snapshot ID — set directly; diff catch-up handles large gaps
self.next_snapshot_id = int(scan_from)

# Initial scan (default "latest" path, also taken when scan_from is None or "latest")
if self.next_snapshot_id is None:
latest_snapshot = self._snapshot_manager.get_latest_snapshot()
if latest_snapshot:
Expand Down
Loading
Loading