Skip to content

Commit a4326f6

Browse files
committed
fix(streaming): filter non-streamable files by extension
Add SUPPORTED_EXTENSIONS class attribute to StreamingBackend base class and _filter_streamable_files() method to filter out non-image files (CSV, JSON, etc.) before streaming. This prevents crashes when special output files are accidentally passed to Napari/Fiji streaming backends. Non-streamable files are logged and skipped gracefully. Changes: - streaming.py: Add SUPPORTED_EXTENSIONS and _filter_streamable_files() - napari_stream.py: Use _filter_streamable_files() in save_batch() - fiji_stream.py: Use _filter_streamable_files() in save_batch()
1 parent 607f67c commit a4326f6

3 files changed

Lines changed: 58 additions & 1 deletion

File tree

src/polystore/fiji_stream.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@ def save_batch(self, data_list: List[Any], file_paths: List[Union[str, Path]], *
7171

7272
logger.info(f"📦 FIJI BACKEND: save_batch called with {len(data_list)} items")
7373

74+
# Filter to only supported file types
75+
data_list, file_paths, skipped = self._filter_streamable_files(data_list, file_paths)
76+
if not data_list:
77+
return
78+
7479
# Extract kwargs using generic polymorphic names
7580
host = kwargs.get('host', 'localhost')
7681
port = kwargs['port']

src/polystore/napari_stream.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ def save_batch(self, data_list: List[Any], file_paths: List[Union[str, Path]], *
7272
file_paths: List of path identifiers
7373
**kwargs: Additional metadata
7474
"""
75+
# Filter to only supported file types
76+
data_list, file_paths, skipped = self._filter_streamable_files(data_list, file_paths)
77+
if not data_list:
78+
return
79+
7580
# Extract kwargs using generic polymorphic names
7681
host = kwargs.get('host', 'localhost')
7782
port = kwargs['port']

src/polystore/streaming.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import time
1111
import uuid
1212
from pathlib import Path
13-
from typing import Any, Callable, List, Union
13+
from typing import Any, Callable, List, Set, Union
1414
import numpy as np
1515

1616
from .base import DataSink
@@ -45,11 +45,58 @@ class StreamingBackend(DataSink):
4545
VIEWER_TYPE: str = None
4646
SHM_PREFIX: str = None
4747

48+
# Class attribute: streaming backends only support image array data and ROIs
49+
supports_arbitrary_files: bool = False
50+
51+
# Extensions that streaming backends can handle
52+
# Subclasses can override to add support for specific formats
53+
SUPPORTED_EXTENSIONS: set[str] = {'.tif', '.tiff', '.png', '.jpg', '.jpeg', '.roi.zip'}
54+
4855
@property
4956
def requires_filesystem_validation(self) -> bool:
5057
"""Streaming backends don't require filesystem validation."""
5158
return False
5259

60+
def _filter_streamable_files(
61+
self,
62+
data_list: List[Any],
63+
file_paths: List[Union[str, Path]],
64+
) -> tuple[List[Any], List[Union[str, Path]], List[Union[str, Path]]]:
65+
"""
66+
Filter data to only include files with supported extensions.
67+
68+
Args:
69+
data_list: List of data objects
70+
file_paths: List of file paths
71+
72+
Returns:
73+
Tuple of (filtered_data, filtered_paths, skipped_paths)
74+
"""
75+
filtered_data = []
76+
filtered_paths = []
77+
skipped_paths = []
78+
79+
for data, path in zip(data_list, file_paths):
80+
path_obj = Path(path)
81+
name = path_obj.name.lower()
82+
83+
# Check if extension is supported
84+
is_supported = any(name.endswith(ext) for ext in self.SUPPORTED_EXTENSIONS)
85+
86+
if is_supported:
87+
filtered_data.append(data)
88+
filtered_paths.append(path)
89+
else:
90+
skipped_paths.append(path)
91+
92+
if skipped_paths:
93+
logger.info(
94+
f"{self.VIEWER_TYPE}: Skipping {len(skipped_paths)} non-streamable files: "
95+
f"{[str(p) for p in skipped_paths]}"
96+
)
97+
98+
return filtered_data, filtered_paths, skipped_paths
99+
53100
def __init__(self, transport_config=None):
54101
"""Initialize ZeroMQ and shared memory infrastructure."""
55102
self._publishers = {}

0 commit comments

Comments
 (0)