From e5ed42ea28e797be0f5106c649cbdc10489d2423 Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Wed, 11 Mar 2026 21:33:30 +0800 Subject: [PATCH 1/5] Support pyjindo in pyarrow_file_io --- .../filesystem/jindo_file_system_handler.py | 287 ++++++++++ .../pypaimon/filesystem/pyarrow_file_io.py | 28 +- paimon-python/pypaimon/tests/file_io_test.py | 54 +- .../pypaimon/tests/jindo_file_system_test.py | 504 ++++++++++++++++++ .../pypaimon/tests/oss_file_io_test.py | 360 +++++++++++++ 5 files changed, 1207 insertions(+), 26 deletions(-) create mode 100644 paimon-python/pypaimon/filesystem/jindo_file_system_handler.py create mode 100644 paimon-python/pypaimon/tests/jindo_file_system_test.py create mode 100644 paimon-python/pypaimon/tests/oss_file_io_test.py diff --git a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py new file mode 100644 index 000000000000..9fa98cc71454 --- /dev/null +++ b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py @@ -0,0 +1,287 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import logging + +import pyarrow as pa +from pyarrow import PythonFile +from pyarrow._fs import FileSystemHandler +from pyarrow.fs import FileInfo, FileSelector, FileType + +try: + import pyjindo.fs as jfs + import pyjindo.util as jutil + JINDO_AVAILABLE = True +except ImportError: + JINDO_AVAILABLE = False + jfs = None + jutil = None + +from pypaimon.common.options import Options +from pypaimon.common.options.config import OssOptions + +class JindoInputFile: + def __init__(self, jindo_stream): + self._stream = jindo_stream + self._closed = False + + @property + def closed(self): + if hasattr(self._stream, 'closed'): + return self._stream.closed + return self._closed + + def read(self, nbytes: int = -1): + if self.closed: + raise ValueError("I/O operation on closed file") + if nbytes is None or nbytes < 0: + return self._stream.read() + return self._stream.read(nbytes) + + def seek(self, position: int, whence: int = 0): + if self.closed: + raise ValueError("I/O operation on closed file") + self._stream.seek(position, whence) + + def tell(self) -> int: + if self.closed: + raise ValueError("I/O operation on closed file") + return self._stream.tell() + + def read_at(self, nbytes: int, offset: int): + if self.closed: + raise ValueError("I/O operation on closed file") + return self._stream.pread(nbytes, offset) + + def close(self): + if not self._closed: + self._stream.close() + self._closed = True + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return False + + +class JindoOutputFile: + def __init__(self, jindo_stream): + self._stream = jindo_stream + self._closed = False + + @property + def closed(self): + if hasattr(self._stream, 'closed'): + return self._stream.closed + return self._closed + + def write(self, data: bytes) -> int: + if self.closed: + raise ValueError("I/O operation on closed file") + if isinstance(data, pa.Buffer): + data = data.to_pybytes() + elif not isinstance(data, bytes): + raise TypeError("Unsupported data type") + return self._stream.write(data) + + def flush(self): + if self.closed: + raise ValueError("I/O operation on closed file") + if hasattr(self._stream, 'flush'): + self._stream.flush() + + def close(self): + if not self._closed: + self._stream.close() + self._closed = True + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return False + +class JindoFileSystemHandler(FileSystemHandler): + def __init__(self, root_path: str, catalog_options: Options): + if not JINDO_AVAILABLE: + raise ImportError("pyjindo is not available. Please install pyjindo.") + + self.logger = logging.getLogger(__name__) + self.root_path = root_path + self.properties = catalog_options + + # 从 catalog_options 构建 jindo 配置 + config = jutil.Config() + + access_key_id = catalog_options.get(OssOptions.OSS_ACCESS_KEY_ID) + access_key_secret = catalog_options.get(OssOptions.OSS_ACCESS_KEY_SECRET) + security_token = catalog_options.get(OssOptions.OSS_SECURITY_TOKEN) + endpoint = catalog_options.get(OssOptions.OSS_ENDPOINT) + region = catalog_options.get(OssOptions.OSS_REGION) + + if access_key_id: + config.set("fs.oss.accessKeyId", access_key_id) + if access_key_secret: + config.set("fs.oss.accessKeySecret", access_key_secret) + if security_token: + config.set("fs.oss.securityToken", security_token) + if endpoint: + endpoint_clean = endpoint.replace('http://', '').replace('https://', '') + config.set("fs.oss.endpoint", endpoint_clean) + if region: + config.set("fs.oss.region", region) + + self._jindo_fs = jfs.connect(self.root_path, "root", config) + + def __eq__(self, other): + if isinstance(other, JindoFileSystemHandler): + return self.root_path == other.root_path + return NotImplemented + + def __ne__(self, other): + if isinstance(other, JindoFileSystemHandler): + return not self.__eq__(other) + return NotImplemented + + def _normalize_path(self, path: str) -> str: + if path.startswith('oss://'): + return path + + if not path or path == '.': + return self.root_path.rstrip('/') + '/' + + path_clean = path.lstrip('/') + return self.root_path.rstrip('/') + '/' + path_clean + + def _convert_file_type(self, jindo_type) -> FileType: + if jindo_type == jfs.FileType.File: + return FileType.File + elif jindo_type == jfs.FileType.Directory: + return FileType.Directory + else: + return FileType.Unknown + + def _convert_file_info(self, jindo_info) -> FileInfo: + pa_type = self._convert_file_type(jindo_info.type) + return FileInfo( + path=jindo_info.path, + type=pa_type, + size=jindo_info.size if jindo_info.type == jfs.FileType.File else None, + mtime=jindo_info.mtime if hasattr(jindo_info, 'mtime') else None, + ) + + def get_type_name(self) -> str: + return "jindo" + + def get_file_info(self, paths) -> list: + infos = [] + for path in paths: + normalized = self._normalize_path(path) + try: + jindo_info = self._jindo_fs.get_file_info(normalized) + infos.append(self._convert_file_info(jindo_info)) + except FileNotFoundError: + infos.append(FileInfo(normalized, FileType.NotFound)) + return infos + + def get_file_info_selector(self, selector: FileSelector) -> list: + normalized = self._normalize_path(selector.base_dir) + try: + items = self._jindo_fs.listdir(normalized, recursive=selector.recursive) + return [self._convert_file_info(item) for item in items] + except FileNotFoundError as e: + if selector.allow_not_found: + return [] + raise + + def create_dir(self, path: str, recursive: bool): + normalized = self._normalize_path(path) + self._jindo_fs.mkdir(normalized) + + def delete_dir(self, path: str): + normalized = self._normalize_path(path) + self._jindo_fs.remove(normalized) + + def delete_dir_contents(self, path: str, missing_dir_ok: bool = False): + normalized = self._normalize_path(path) + if normalized == self.root_path: + raise ValueError( + "delete_dir_contents() does not accept root path" + ) + self._delete_dir_contents(path, missing_dir_ok) + + def delete_root_dir_contents(self): + self._delete_dir_contents("/", missing_dir_ok=False) + + def _delete_dir_contents(self, path: str, missing_dir_ok: bool): + normalized = self._normalize_path(path) + try: + items = self._jindo_fs.listdir(normalized, recursive=False) + except FileNotFoundError: + if missing_dir_ok: + return + raise + except Exception as e: + self.logger.warning(f"Error listing {path}: {e}") + raise + for item in items: + self._jindo_fs.remove(item.path) + + def delete_file(self, path: str): + normalized = self._normalize_path(path) + self._jindo_fs.remove(normalized) + + def move(self, src: str, dest: str): + src_norm = self._normalize_path(src) + dst_norm = self._normalize_path(dest) + self._jindo_fs.rename(src_norm, dst_norm) + + def copy_file(self, src: str, dest: str): + src_norm = self._normalize_path(src) + dst_norm = self._normalize_path(dest) + self._jindo_fs.copy_file(src_norm, dst_norm) + # with self._jindo_fs.open(src_norm, "rb") as src_f: + # with self._jindo_fs.open(dst_norm, "wb") as dst_f: + # while True: + # chunk = src_f.read(1024 * 1024) # 1 MB + # if not chunk: + # break + # dst_f.write(chunk) + + def open_input_stream(self, path: str): + normalized = self._normalize_path(path) + jindo_stream = self._jindo_fs.open(normalized, "rb") + return PythonFile(JindoInputFile(jindo_stream), mode="r") + + def open_input_file(self, path: str): + normalized = self._normalize_path(path) + jindo_stream = self._jindo_fs.open(normalized, "rb") + return PythonFile(JindoInputFile(jindo_stream), mode="r") + + def open_output_stream(self, path: str, metadata): + normalized = self._normalize_path(path) + jindo_stream = self._jindo_fs.open(normalized, "wb") + return PythonFile(JindoOutputFile(jindo_stream), mode="w") + + def open_append_stream(self, path: str, metadata): + raise IOError("append mode is not supported") + + def normalize_path(self, path: str) -> str: + return self._normalize_path(path) + diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index c4b64445f701..384cbe7b6c6a 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -41,6 +41,11 @@ from pypaimon.table.row.row_kind import RowKind from pypaimon.write.blob_format_writer import BlobFormatWriter +try: + from pypaimon.filesystem.jindo_file_system_handler import JindoFileSystemHandler, JINDO_AVAILABLE +except ImportError: + JINDO_AVAILABLE = False + JindoFileSystemHandler = None def _pyarrow_lt_7(): return parse(pyarrow.__version__) < parse("7.0.0") @@ -56,9 +61,14 @@ def __init__(self, path: str, catalog_options: Options): self.uri_reader_factory = UriReaderFactory(catalog_options) self._is_oss = scheme in {"oss"} self._oss_bucket = None + self.use_jindo_fs = False if self._is_oss: self._oss_bucket = self._extract_oss_bucket(path) - self.filesystem = self._initialize_oss_fs(path) + # Try to use JindoFileSystem if available, otherwise fall back to S3FileSystem + if JINDO_AVAILABLE: + self.filesystem = self._initialize_jindo_fs(path) + else: + self.filesystem = self._initialize_oss_fs(path) elif scheme in {"s3", "s3a", "s3n"}: self.filesystem = self._initialize_s3_fs() elif scheme in {"hdfs", "viewfs"}: @@ -116,6 +126,13 @@ def _extract_oss_bucket(self, location) -> str: raise ValueError("Invalid OSS URI without bucket: {}".format(location)) return bucket + def _initialize_jindo_fs(self, path) -> FileSystem: + """Initialize JindoFileSystem for OSS access.""" + root_path = f"oss://{self._oss_bucket}/" + fs_handler = JindoFileSystemHandler(root_path, self.properties) + self.use_jindo_fs = True + return pafs.PyFileSystem(fs_handler) + def _initialize_oss_fs(self, path) -> FileSystem: client_kwargs = { "access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID), @@ -184,7 +201,9 @@ def new_input_stream(self, path: str): def new_output_stream(self, path: str): path_str = self.to_filesystem_path(path) - if self._is_oss and not self._pyarrow_gte_7: + if self.use_jindo_fs: + pass + elif self._is_oss and not self._pyarrow_gte_7: # For PyArrow 6.x + OSS, path_str is already just the key part if '/' in path_str: parent_dir = '/'.join(path_str.split('/')[:-1]) @@ -546,6 +565,11 @@ def to_filesystem_path(self, path: str) -> str: path_part = normalized_path.lstrip('/') return f"{drive_letter}:/{path_part}" if path_part else f"{drive_letter}:" + if self.use_jindo_fs: + # For JindoFileSystem, pass key only + path_part = normalized_path.lstrip('/') + return path_part if path_part else '.' + if isinstance(self.filesystem, S3FileSystem): if parsed.scheme: if parsed.netloc: diff --git a/paimon-python/pypaimon/tests/file_io_test.py b/paimon-python/pypaimon/tests/file_io_test.py index ba91f94a18e6..52d93d176eba 100644 --- a/paimon-python/pypaimon/tests/file_io_test.py +++ b/paimon-python/pypaimon/tests/file_io_test.py @@ -67,11 +67,14 @@ def test_filesystem_path_conversion(self): lt7 = _pyarrow_lt_7() oss_io = PyArrowFileIO("oss://test-bucket/warehouse", Options({ - OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com' + OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com', + OssOptions.OSS_ACCESS_KEY_ID.key(): 'test-key', + OssOptions.OSS_ACCESS_KEY_SECRET.key(): 'test-secret', })) + use_jindo_fs = oss_io.use_jindo_fs got = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") - self.assertEqual(got, "path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt") - if lt7: + self.assertEqual(got, "path/to/file.txt" if lt7 or use_jindo_fs else "test-bucket/path/to/file.txt") + if lt7 or use_jindo_fs: self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx/data.parquet"), "db-xxx.db/tbl-xxx/data.parquet") self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx"), "db-xxx.db/tbl-xxx") @@ -88,26 +91,27 @@ def record_get_file_info(paths): get_file_info_calls.append(list(paths)) return [MagicMock(type=pafs.FileType.NotFound) for _ in paths] - mock_fs = MagicMock() - mock_fs.get_file_info.side_effect = record_get_file_info if lt7 else [[nf], [nf]] - mock_fs.create_dir = MagicMock() - mock_fs.open_output_stream.return_value = MagicMock() - oss_io.filesystem = mock_fs - oss_io.new_output_stream("oss://test-bucket/path/to/file.txt") - mock_fs.create_dir.assert_called_once() - path_str = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") - if lt7: - expected_parent = '/'.join(path_str.split('/')[:-1]) if '/' in path_str else '' - else: - expected_parent = "/".join(path_str.split("/")[:-1]) if "/" in path_str else str(Path(path_str).parent) - self.assertEqual(mock_fs.create_dir.call_args[0][0], expected_parent) - if lt7: - for call_paths in get_file_info_calls: - for p in call_paths: - self.assertFalse( - p.startswith("test-bucket/"), - "OSS+PyArrow<7 must pass key only to get_file_info, not bucket/key. Got: %r" % (p,) - ) + if not use_jindo_fs: + mock_fs = MagicMock() + mock_fs.get_file_info.side_effect = record_get_file_info if lt7 else [[nf], [nf]] + mock_fs.create_dir = MagicMock() + mock_fs.open_output_stream.return_value = MagicMock() + oss_io.filesystem = mock_fs + oss_io.new_output_stream("oss://test-bucket/path/to/file.txt") + mock_fs.create_dir.assert_called_once() + path_str = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") + if lt7: + expected_parent = '/'.join(path_str.split('/')[:-1]) if '/' in path_str else '' + else: + expected_parent = "/".join(path_str.split("/")[:-1]) if "/" in path_str else str(Path(path_str).parent) + self.assertEqual(mock_fs.create_dir.call_args[0][0], expected_parent) + if lt7: + for call_paths in get_file_info_calls: + for p in call_paths: + self.assertFalse( + p.startswith("test-bucket/"), + "OSS+PyArrow<7 must pass key only to get_file_info, not bucket/key. Got: %r" % (p,) + ) def test_exists(self): lt7 = _pyarrow_lt_7() @@ -286,7 +290,9 @@ def test_exists_does_not_catch_exception(self): file_io.delete_directory_quietly("file:///some/path") oss_io = PyArrowFileIO("oss://test-bucket/warehouse", Options({ - OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com' + OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com', + OssOptions.OSS_ACCESS_KEY_ID.key(): 'test-key', + OssOptions.OSS_ACCESS_KEY_SECRET.key(): 'test-secret', })) mock_fs = MagicMock() mock_fs.get_file_info.return_value = [ diff --git a/paimon-python/pypaimon/tests/jindo_file_system_test.py b/paimon-python/pypaimon/tests/jindo_file_system_test.py new file mode 100644 index 000000000000..4a8dce59c401 --- /dev/null +++ b/paimon-python/pypaimon/tests/jindo_file_system_test.py @@ -0,0 +1,504 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os +import random +import time +import unittest +import uuid + +import pyarrow.fs as pafs + +from pyarrow.fs import PyFileSystem +from pypaimon.common.options import Options +from pypaimon.common.options.config import OssOptions +from pypaimon.filesystem.jindo_file_system_handler import JindoFileSystemHandler, JINDO_AVAILABLE + +class JindoFileSystemTest(unittest.TestCase): + """Test cases for JindoFileSystem.""" + + def setUp(self): + """Set up test fixtures.""" + if not JINDO_AVAILABLE: + self.skipTest("pyjindo is not available") + + # Get OSS credentials from environment variables or use test values + bucket = os.environ.get("OSS_TEST_BUCKET") + access_key_id = os.environ.get("OSS_ACCESS_KEY_ID") + access_key_secret = os.environ.get("OSS_ACCESS_KEY_SECRET") + endpoint = os.environ.get("OSS_ENDPOINT") + if not bucket: + self.skipTest("test bucket is not configured") + return + if not access_key_id: + self.skipTest("test access key id is not configured") + return + if not access_key_secret: + self.skipTest("test access key secret is not configured") + return + if not endpoint: + self.skipTest("test endpoint is not configured") + return + self.root_path = f"oss://{bucket}/" + + + self.catalog_options = Options({ + OssOptions.OSS_ACCESS_KEY_ID.key(): access_key_id, + OssOptions.OSS_ACCESS_KEY_SECRET.key(): access_key_secret, + OssOptions.OSS_ENDPOINT.key(): endpoint, + }) + + # Create JindoFileSystemHandler instance + fs_handler = JindoFileSystemHandler(self.root_path, self.catalog_options) + self.fs = PyFileSystem(fs_handler) + + # Create unique test prefix to avoid conflicts + self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/" + + def tearDown(self): + """Clean up test files and directories.""" + # Delete the entire test prefix directory + test_dir = f"{self.root_path}{self.test_prefix}" + try: + file_info = self.fs.get_file_info(test_dir) + if file_info.type == pafs.FileType.Directory: + try: + self.fs.delete_dir(test_dir) + except Exception: + pass + except Exception: + pass # Ignore cleanup errors + + def _get_test_path(self, name: str) -> str: + """Get a test path under test_prefix.""" + return f"{self.root_path}{self.test_prefix}{name}" + + def test_get_root_file_info(self): + """Test get_file_info for root path.""" + # Verify directory exists using get_file_info + file_info = self.fs.get_file_info(self.root_path) + + # Verify the returned file info + self.assertIsInstance(file_info, pafs.FileInfo) + self.assertEqual(file_info.type, pafs.FileType.Directory) + + def test_create_dir_recursive(self): + """Test create_dir with recursive=True.""" + test_dir = self._get_test_path("nested/deep/dir/") + + # Create nested directory + self.fs.create_dir(test_dir, recursive=True) + + # Verify directory exists + file_info = self.fs.get_file_info(test_dir) + self.assertEqual(file_info.type, pafs.FileType.Directory) + + def test_write_and_read_small_file(self): + """Test writing and reading a small file (10 bytes) with random data.""" + test_file = self._get_test_path("small-file.txt") + test_data = os.urandom(10) + + # Write file + out_stream = self.fs.open_output_stream(test_file) + out_stream.write(test_data) + out_stream.close() + + # Read file + with self.fs.open_input_file(test_file) as in_file: + read_data = in_file.read() + + # Verify data correctness + self.assertEqual(test_data, read_data) + self.assertEqual(len(read_data), 10) + + # Verify file info + file_info = self.fs.get_file_info(test_file) + self.assertEqual(file_info.type, pafs.FileType.File) + self.assertEqual(file_info.size, 10) + + def test_write_and_read_large_file(self): + """Test writing and reading a large file (20MB) with random data.""" + test_file = self._get_test_path("large-file.bin") + file_size = 20 * 1024 * 1024 # 20MB + test_data = os.urandom(file_size) + + # Write file in chunks + chunk_size = 1024 * 1024 # 1MB chunks + out_stream = self.fs.open_output_stream(test_file) + for i in range(0, len(test_data), chunk_size): + chunk = test_data[i:i + chunk_size] + out_stream.write(chunk) + out_stream.close() + + # Verify file info + file_info = self.fs.get_file_info(test_file) + self.assertEqual(file_info.type, pafs.FileType.File) + self.assertEqual(file_info.size, file_size) + + # Test JindoInputFile all methods + with self.fs.open_input_file(test_file) as in_file: + # Test tell() - should be at position 0 initially + self.assertEqual(in_file.tell(), 0) + + # Test read() with specific size + chunk1 = in_file.read(1024) + self.assertEqual(len(chunk1), 1024) + self.assertEqual(in_file.tell(), 1024) + self.assertEqual(chunk1, test_data[0:1024]) + + # Test seek() - seek to middle of file + middle_pos = file_size // 2 + in_file.seek(middle_pos) + self.assertEqual(in_file.tell(), middle_pos) + + # Test read_at() + read_at_data = in_file.read_at(1024, 0) + self.assertEqual(len(read_at_data), 1024) + self.assertEqual(read_at_data, test_data[0:1024]) + + # Test read_at() at different offset + offset = file_size - 2048 + read_at_data2 = in_file.read_at(1024, offset) + self.assertEqual(len(read_at_data2), 1024) + self.assertEqual(read_at_data2, test_data[offset:offset + 1024]) + + # Test seek() to end + in_file.seek(file_size) + self.assertEqual(in_file.tell(), file_size) + + # Test read() at end - should return empty bytes + empty_data = in_file.read(100) + self.assertEqual(len(empty_data), 0) + + # Test read() without size - read remaining + in_file.seek(0) + all_data = in_file.read() + self.assertEqual(len(all_data), file_size) + self.assertEqual(all_data, test_data) + + # Verify complete file read + with self.fs.open_input_file(test_file) as in_file: + complete_data = in_file.read() + self.assertEqual(complete_data, test_data) + self.assertEqual(len(complete_data), file_size) + + def test_get_file_info_single_path(self): + """Test get_file_info with single path.""" + test_file = self._get_test_path("info-test.txt") + test_data = b"test content" + + # Write file + out_stream = self.fs.open_output_stream(test_file) + out_stream.write(test_data) + out_stream.close() + + # Get file info + file_info = self.fs.get_file_info(test_file) + self.assertIsInstance(file_info, pafs.FileInfo) + self.assertEqual(file_info.type, pafs.FileType.File) + self.assertEqual(file_info.size, len(test_data)) + + def test_get_file_info_list(self): + """Test get_file_info with list of paths.""" + test_file = self._get_test_path("info-test1.txt") + test_dir = self._get_test_path("dir1") + + # Write files + out_stream = self.fs.open_output_stream(test_file) + out_stream.write(b"content1") + out_stream.close() + self.fs.create_dir(test_dir) + + # Get file info for list + file_infos = self.fs.get_file_info([test_file, test_dir]) + self.assertIsInstance(file_infos, list) + self.assertEqual(len(file_infos), 2) + self.assertEqual(file_infos[0].type, pafs.FileType.File) + self.assertTrue(test_file in file_infos[0].path) + self.assertEqual(file_infos[1].type, pafs.FileType.Directory) + self.assertTrue(test_dir in file_infos[1].path) + + def test_get_file_info_with_selector(self): + """Test get_file_info with FileSelector.""" + test_dir = self._get_test_path("selector-test/") + test_file1 = self._get_test_path("selector-test/file1.txt") + test_file2 = self._get_test_path("selector-test/file2.txt") + + # Create files + self.fs.create_dir(test_dir) + out_stream1 = self.fs.open_output_stream(test_file1) + out_stream1.write(b"content1") + out_stream1.close() + out_stream2 = self.fs.open_output_stream(test_file2) + out_stream2.write(b"content2") + out_stream2.close() + + # Test non-recursive listing + selector = pafs.FileSelector(test_dir, recursive=False, allow_not_found=False) + file_infos = self.fs.get_file_info(selector) + self.assertIsInstance(file_infos, list) + self.assertEqual(len(file_infos), 2) + + # Verify we got the files + file_names = [info.path for info in file_infos] + self.assertTrue(any("file1.txt" in name for name in file_names)) + self.assertTrue(any("file2.txt" in name for name in file_names)) + + def test_get_file_info_with_selector_recursive(self): + """Test get_file_info with FileSelector recursive=True.""" + test_dir = self._get_test_path("selector-recursive/") + test_subdir = self._get_test_path("selector-recursive/subdir/") + test_file1 = self._get_test_path("selector-recursive/file1.txt") + test_file2 = self._get_test_path("selector-recursive/subdir/file2.txt") + + # Create directory structure + self.fs.create_dir(test_dir) + self.fs.create_dir(test_subdir) + out_stream1 = self.fs.open_output_stream(test_file1) + out_stream1.write(b"content1") + out_stream1.close() + out_stream2 = self.fs.open_output_stream(test_file2) + out_stream2.write(b"content2") + out_stream2.close() + + # Test recursive listing + selector = pafs.FileSelector(test_dir, recursive=True, allow_not_found=False) + file_infos = self.fs.get_file_info(selector) + self.assertIsInstance(file_infos, list) + self.assertEqual(len(file_infos), 3) + + def test_get_file_info_not_found(self): + """Test get_file_info for non-existent file.""" + non_existent = self._get_test_path("non-existent-file.txt") + + file_info = self.fs.get_file_info(non_existent) + self.assertEqual(file_info.type, pafs.FileType.NotFound) + + # Try to open non-existent file should raise FileNotFoundError + with self.assertRaises(FileNotFoundError): + with self.fs.open_input_file(non_existent) as in_file: + in_file.read() + + def test_get_file_info_selector_allow_not_found(self): + """Test get_file_info with FileSelector allow_not_found=True.""" + non_existent_dir = self._get_test_path("non-existent-dir/") + + # Test with allow_not_found=True + selector = pafs.FileSelector(non_existent_dir, recursive=False, allow_not_found=True) + file_infos = self.fs.get_file_info(selector) + self.assertIsInstance(file_infos, list) + self.assertEqual(len(file_infos), 0) + + def test_delete_file(self): + """Test delete_file method.""" + test_file = self._get_test_path("delete-test.txt") + + # Create file + out_stream = self.fs.open_output_stream(test_file) + out_stream.write(b"test content") + out_stream.close() + + # Verify file exists + file_info = self.fs.get_file_info(test_file) + self.assertEqual(file_info.type, pafs.FileType.File) + + # Delete file + self.fs.delete_file(test_file) + + # Verify file is deleted - should raise FileNotFoundError when accessing + with self.assertRaises(FileNotFoundError): + with self.fs.open_input_file(test_file) as in_file: + in_file.read() + + def test_delete_dir(self): + """Test delete_dir method.""" + test_dir = self._get_test_path("delete-dir/") + + # Create directory + self.fs.create_dir(test_dir) + + # Verify directory exists + file_info = self.fs.get_file_info(test_dir) + self.assertEqual(file_info.type, pafs.FileType.Directory) + + # Delete directory + self.fs.delete_dir(test_dir) + + # Verify directory is deleted - should raise FileNotFoundError when accessing + with self.assertRaises(FileNotFoundError): + # Try to list directory contents + selector = pafs.FileSelector(test_dir, recursive=False, allow_not_found=False) + self.fs.get_file_info(selector) + + def test_delete_dir_contents(self): + """Test delete_dir_contents method.""" + test_dir = self._get_test_path("delete-contents/") + test_file1 = self._get_test_path("delete-contents/file1.txt") + test_file2 = self._get_test_path("delete-contents/file2.txt") + test_subdir = self._get_test_path("delete-contents/subdir/") + test_file3 = self._get_test_path("delete-contents/subdir/file3.txt") + + # Create directory structure + self.fs.create_dir(test_dir) + out_stream1 = self.fs.open_output_stream(test_file1) + out_stream1.write(b"content1") + out_stream1.close() + out_stream2 = self.fs.open_output_stream(test_file2) + out_stream2.write(b"content2") + out_stream2.close() + self.fs.create_dir(test_subdir) + out_stream3 = self.fs.open_output_stream(test_file3) + out_stream3.write(b"content3") + out_stream3.close() + + # Delete directory contents + self.fs.delete_dir_contents(test_dir) + + # Verify directory still exists but is empty + file_info = self.fs.get_file_info(test_dir) + self.assertEqual(file_info.type, pafs.FileType.Directory) + selector = pafs.FileSelector(test_dir, recursive=False, allow_not_found=False) + file_infos = self.fs.get_file_info(selector) + self.assertEqual(len(file_infos), 0) + + # Verify files are deleted - should raise FileNotFoundError when accessing + with self.assertRaises(FileNotFoundError): + with self.fs.open_input_file(test_file1) as in_file: + in_file.read() + with self.assertRaises(FileNotFoundError): + with self.fs.open_input_file(test_file2) as in_file: + in_file.read() + with self.assertRaises(FileNotFoundError): + with self.fs.open_input_file(test_file3) as in_file: + in_file.read() + + def test_move_file(self): + """Test move method for file.""" + src_file = self._get_test_path("move-src.txt") + dst_file = self._get_test_path("move-dst.txt") + test_data = b"move test content" + + # Create source file + out_stream = self.fs.open_output_stream(src_file) + out_stream.write(test_data) + out_stream.close() + + # Move file + self.fs.move(src_file, dst_file) + + # Verify source is gone - should raise FileNotFoundError when accessing + with self.assertRaises(FileNotFoundError): + with self.fs.open_input_file(src_file) as in_file: + in_file.read() + + # Verify destination exists with correct content + dst_info = self.fs.get_file_info(dst_file) + self.assertEqual(dst_info.type, pafs.FileType.File) + + with self.fs.open_input_file(dst_file) as in_file: + read_data = in_file.read() + self.assertEqual(read_data, test_data) + + def test_move_directory(self): + """Test move method for directory.""" + src_dir = self._get_test_path("move-src-dir/") + dst_dir = self._get_test_path("move-dst-dir/") + src_file = self._get_test_path("move-src-dir/file.txt") + + # Create source directory and file + self.fs.create_dir(src_dir) + out_stream = self.fs.open_output_stream(src_file) + out_stream.write(b"test content") + out_stream.close() + + # Move directory + self.fs.move(src_dir, dst_dir) + + # Verify source is gone - should raise FileNotFoundError when accessing + with self.assertRaises(FileNotFoundError): + # Try to list directory contents + selector = pafs.FileSelector(src_dir, recursive=False, allow_not_found=False) + self.fs.get_file_info(selector) + + # Verify destination exists + dst_info = self.fs.get_file_info(dst_dir) + self.assertEqual(dst_info.type, pafs.FileType.Directory) + + def test_copy_file(self): + """Test copy_file method.""" + src_file = self._get_test_path("copy-src.txt") + dst_file = self._get_test_path("copy-dst.txt") + test_data = os.urandom(1024 * 1024) # 1MB random data + + # Create source file + out_stream = self.fs.open_output_stream(src_file) + out_stream.write(test_data) + out_stream.close() + + # Copy file + self.fs.copy_file(src_file, dst_file) + + # Verify both files exist + src_info = self.fs.get_file_info(src_file) + self.assertEqual(src_info.type, pafs.FileType.File) + dst_info = self.fs.get_file_info(dst_file) + self.assertEqual(dst_info.type, pafs.FileType.File) + + # Verify destination content matches source + with self.fs.open_input_file(dst_file) as in_file: + read_data = in_file.read() + self.assertEqual(read_data, test_data) + self.assertEqual(len(read_data), len(test_data)) + + def test_delete_nonexistent_file(self): + """Test deleting non-existent file.""" + non_existent = self._get_test_path("non-existent-delete.txt") + + # Try to delete non-existent file + with self.assertRaises(IOError): + self.fs.delete_file(non_existent) + + def test_multiple_sequential_reads(self): + """Test multiple sequential reads from same file.""" + test_file = self._get_test_path("sequential-read.txt") + test_data = os.urandom(10000) # 10KB + + # Write file + out_stream = self.fs.open_output_stream(test_file) + out_stream.write(test_data) + out_stream.close() + + # Read in chunks sequentially + with self.fs.open_input_file(test_file) as in_file: + chunk1 = in_file.read(1000) + chunk2 = in_file.read(2000) + chunk3 = in_file.read(3000) + chunk4 = in_file.read() # Read rest + + # Verify all chunks + self.assertEqual(chunk1, test_data[0:1000]) + self.assertEqual(chunk2, test_data[1000:3000]) + self.assertEqual(chunk3, test_data[3000:6000]) + self.assertEqual(chunk4, test_data[6000:]) + + # Verify total length + total_read = len(chunk1) + len(chunk2) + len(chunk3) + len(chunk4) + self.assertEqual(total_read, len(test_data)) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/oss_file_io_test.py b/paimon-python/pypaimon/tests/oss_file_io_test.py new file mode 100644 index 000000000000..f5f2642f2093 --- /dev/null +++ b/paimon-python/pypaimon/tests/oss_file_io_test.py @@ -0,0 +1,360 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os +import unittest +import uuid + +import pyarrow.fs as pafs + +from pypaimon.common.options import Options +from pypaimon.common.options.config import OssOptions +from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO + + +class OSSFileIOTest(unittest.TestCase): + """Test cases for PyArrowFileIO with OSS.""" + + def setUp(self): + """Set up test fixtures.""" + # Get OSS credentials from environment variables + self.bucket = os.environ.get("OSS_TEST_BUCKET") + access_key_id = os.environ.get("OSS_ACCESS_KEY_ID") + access_key_secret = os.environ.get("OSS_ACCESS_KEY_SECRET") + endpoint = os.environ.get("OSS_ENDPOINT") + + if not self.bucket: + self.skipTest("test bucket is not configured") + return + if not access_key_id: + self.skipTest("test access key id is not configured") + return + if not access_key_secret: + self.skipTest("test access key secret is not configured") + return + if not endpoint: + self.skipTest("test endpoint is not configured") + return + + self.root_path = f"oss://{self.bucket}/" + + self.catalog_options = Options({ + OssOptions.OSS_ACCESS_KEY_ID.key(): access_key_id, + OssOptions.OSS_ACCESS_KEY_SECRET.key(): access_key_secret, + OssOptions.OSS_ENDPOINT.key(): endpoint, + }) + + # Create PyArrowFileIO instance + self.file_io = PyArrowFileIO(self.root_path, self.catalog_options) + + # Create unique test prefix to avoid conflicts + self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/" + + def tearDown(self): + """Clean up test files and directories.""" + # Delete the entire test prefix directory + test_dir = f"{self.root_path}{self.test_prefix}" + try: + if self.file_io.exists(test_dir): + self.file_io.delete(test_dir, recursive=True) + except Exception: + pass # Ignore cleanup errors + + def _get_test_path(self, name: str) -> str: + """Get a test path under test_prefix.""" + return f"{self.root_path}{self.test_prefix}{name}" + + def test_get_file_status_directory(self): + """Test get_file_status for a directory.""" + # Create a test directory + test_dir = self._get_test_path("test-dir/") + self.file_io.mkdirs(test_dir) + # Get file status + file_status = self.file_io.get_file_status(test_dir) + # Verify the returned file status + self.assertIsNotNone(file_status) + self.assertEqual(file_status.type, pafs.FileType.Directory) + + def test_new_input_stream_read(self): + """Test new_input_stream and read method.""" + # Create test data + test_data = b"Hello, World! This is a test file for OSS input stream." + test_file = self._get_test_path("test-input-stream.txt") + + # Write test data to file + with self.file_io.new_output_stream(test_file) as out_stream: + out_stream.write(test_data) + + # Test new_input_stream + input_stream = self.file_io.new_input_stream(test_file) + self.assertIsNotNone(input_stream) + + # Test read without nbytes (read all) + input_stream.seek(0) + read_data = input_stream.read() + self.assertEqual(read_data, test_data) + + # Test read with nbytes + input_stream.seek(0) + read_partial = input_stream.read(5) + self.assertEqual(read_partial, b"Hello") + + # Test read more bytes + read_partial2 = input_stream.read(7) + self.assertEqual(read_partial2, b", World") + + # Test read remaining + read_remaining = input_stream.read() + self.assertEqual(read_remaining, b"! This is a test file for OSS input stream.") + + # Verify complete data + input_stream.seek(0) + complete_data = input_stream.read() + self.assertEqual(complete_data, test_data) + + # Close the stream + input_stream.close() + + # Test context manager + with self.file_io.new_input_stream(test_file) as input_stream2: + data = input_stream2.read() + self.assertEqual(data, test_data) + + def test_new_input_stream_read_large_file(self): + """Test new_input_stream with larger file and read operations.""" + # Create larger test data (1MB) + test_data = b"X" * (1024 * 1024) + test_file = self._get_test_path("test-large-input-stream.bin") + + # Write test data + with self.file_io.new_output_stream(test_file) as out_stream: + out_stream.write(test_data) + + # Test reading in chunks + chunk_size = 64 * 1024 # 64KB chunks + with self.file_io.new_input_stream(test_file) as input_stream: + read_chunks = [] + while True: + chunk = input_stream.read(chunk_size) + if not chunk: + break + read_chunks.append(chunk) + + # Verify all data was read + read_data = b''.join(read_chunks) + self.assertEqual(len(read_data), len(test_data)) + self.assertEqual(read_data, test_data) + + # Test read_at method if available + with self.file_io.new_input_stream(test_file) as input_stream: + if hasattr(input_stream, 'read_at'): + # Read from middle of file + offset = len(test_data) // 2 + read_at_data = input_stream.read_at(1024, offset) + self.assertEqual(len(read_at_data), 1024) + self.assertEqual(read_at_data, test_data[offset:offset+1024]) + + def test_new_input_stream_file_not_found(self): + """Test new_input_stream with non-existent file.""" + non_existent_file = self._get_test_path("non-existent-file.txt") + + with self.assertRaises(FileNotFoundError): + self.file_io.new_input_stream(non_existent_file) + + def test_exists(self): + """Test exists method.""" + missing_uri = self._get_test_path("nonexistent_xyz") + self.assertFalse(self.file_io.exists(missing_uri)) + with self.assertRaises(FileNotFoundError): + self.file_io.get_file_status(missing_uri) + + def test_write_file_with_overwrite_flag(self): + """Test write_file with overwrite flag.""" + test_file_uri = self._get_test_path("overwrite_test.txt") + + # 1) Write to a new file with default overwrite=False + self.file_io.write_file(test_file_uri, "first content") + self.assertTrue(self.file_io.exists(test_file_uri)) + content = self.file_io.read_file_utf8(test_file_uri) + self.assertEqual(content, "first content") + + # 2) Attempt to write again with overwrite=False should raise FileExistsError + with self.assertRaises(FileExistsError): + self.file_io.write_file(test_file_uri, "second content", overwrite=False) + + # Ensure content is unchanged + content = self.file_io.read_file_utf8(test_file_uri) + self.assertEqual(content, "first content") + + # 3) Write with overwrite=True should replace the content + self.file_io.write_file(test_file_uri, "overwritten content", overwrite=True) + content = self.file_io.read_file_utf8(test_file_uri) + self.assertEqual(content, "overwritten content") + + def test_exists_does_not_catch_exception(self): + """Test that exists does not catch exceptions.""" + test_file = self._get_test_path("test_file.txt") + + # Write a test file + with self.file_io.new_output_stream(test_file) as out_stream: + out_stream.write(b"test") + self.assertTrue(self.file_io.exists(test_file)) + self.assertFalse(self.file_io.exists(self._get_test_path("nonexistent.txt"))) + + def test_delete_non_empty_directory_raises_error(self): + """Test that delete raises error for non-empty directory.""" + test_dir = self._get_test_path("test_dir/") + test_file = self._get_test_path("test_dir/test_file.txt") + + # Create directory and file + self.file_io.mkdirs(test_dir) + with self.file_io.new_output_stream(test_file) as out_stream: + out_stream.write(b"test") + + with self.assertRaises(OSError) as context: + self.file_io.delete(test_dir, recursive=False) + self.assertIn("is not empty", str(context.exception)) + + def test_delete_returns_false_when_file_not_exists(self): + """Test that delete returns False when file does not exist.""" + result = self.file_io.delete(self._get_test_path("nonexistent.txt")) + self.assertFalse(result, "delete() should return False when file does not exist") + + result = self.file_io.delete(self._get_test_path("nonexistent_dir"), recursive=False) + self.assertFalse(result, "delete() should return False when directory does not exist") + + def test_mkdirs_raises_error_when_path_is_file(self): + """Test that mkdirs raises error when path is a file.""" + test_file = self._get_test_path("test_file.txt") + + # Create a file + with self.file_io.new_output_stream(test_file) as out_stream: + out_stream.write(b"test") + + with self.assertRaises(FileExistsError) as context: + self.file_io.mkdirs(test_file) + self.assertIn("is not a directory", str(context.exception)) + + def test_rename_returns_false_when_dst_exists(self): + """Test that rename returns False when destination exists.""" + src_file = self._get_test_path("src.txt") + dst_file = self._get_test_path("dst.txt") + + # Create source and destination files + with self.file_io.new_output_stream(src_file) as out_stream: + out_stream.write(b"src") + with self.file_io.new_output_stream(dst_file) as out_stream: + out_stream.write(b"dst") + + result = self.file_io.rename(src_file, dst_file) + self.assertFalse(result) + + def test_get_file_status_raises_error_when_file_not_exists(self): + """Test that get_file_status raises error when file does not exist.""" + with self.assertRaises(FileNotFoundError) as context: + self.file_io.get_file_status(self._get_test_path("nonexistent.txt")) + self.assertIn("does not exist", str(context.exception)) + + test_file = self._get_test_path("test_file.txt") + with self.file_io.new_output_stream(test_file) as out_stream: + out_stream.write(b"test content") + + file_info = self.file_io.get_file_status(test_file) + self.assertEqual(file_info.type, pafs.FileType.File) + self.assertIsNotNone(file_info.size) + + with self.assertRaises(FileNotFoundError) as context: + self.file_io.get_file_size(self._get_test_path("nonexistent.txt")) + self.assertIn("does not exist", str(context.exception)) + + with self.assertRaises(FileNotFoundError) as context: + self.file_io.is_dir(self._get_test_path("nonexistent_dir")) + self.assertIn("does not exist", str(context.exception)) + + def test_copy_file(self): + """Test copy_file method.""" + source_file = self._get_test_path("source.txt") + target_file = self._get_test_path("target.txt") + + # Create source file + with self.file_io.new_output_stream(source_file) as out_stream: + out_stream.write(b"source content") + + # Test 1: Raises FileExistsError when target exists and overwrite=False + with self.file_io.new_output_stream(target_file) as out_stream: + out_stream.write(b"target content") + + with self.assertRaises(FileExistsError) as context: + self.file_io.copy_file(source_file, target_file, overwrite=False) + self.assertIn("already exists", str(context.exception)) + + # Verify target content unchanged + with self.file_io.new_input_stream(target_file) as in_stream: + content = in_stream.read() + self.assertEqual(content, b"target content") + + # Test 2: Overwrites when overwrite=True + self.file_io.copy_file(source_file, target_file, overwrite=True) + with self.file_io.new_input_stream(target_file) as in_stream: + content = in_stream.read() + self.assertEqual(content, b"source content") + + # Test 3: Creates parent directory if it doesn't exist + target_file_in_subdir = self._get_test_path("subdir/target.txt") + self.file_io.copy_file(source_file, target_file_in_subdir, overwrite=False) + self.assertTrue(self.file_io.exists(target_file_in_subdir)) + with self.file_io.new_input_stream(target_file_in_subdir) as in_stream: + content = in_stream.read() + self.assertEqual(content, b"source content") + + def test_try_to_write_atomic(self): + """Test try_to_write_atomic method.""" + target_dir = self._get_test_path("target_dir/") + normal_file = self._get_test_path("normal_file.txt") + + # Create target directory + self.file_io.mkdirs(target_dir) + self.assertFalse( + self.file_io.try_to_write_atomic(target_dir, "test content"), + "PyArrowFileIO should return False when target is a directory") + + # Verify no file was created inside the directory + # List directory contents to verify it's empty + selector = pafs.FileSelector(self.file_io.to_filesystem_path(target_dir), recursive=False, allow_not_found=True) + dir_contents = self.file_io.filesystem.get_file_info(selector) + self.assertEqual(len(dir_contents), 0, "No file should be created inside the directory") + + self.assertTrue(self.file_io.try_to_write_atomic(normal_file, "test content")) + content = self.file_io.read_file_utf8(normal_file) + self.assertEqual(content, "test content") + + # Delete and test again + self.file_io.delete(normal_file) + self.assertFalse( + self.file_io.try_to_write_atomic(target_dir, "test content"), + "PyArrowFileIO should return False when target is a directory") + + # Verify no file was created inside the directory + dir_contents = self.file_io.filesystem.get_file_info(selector) + self.assertEqual(len(dir_contents), 0, "No file should be created inside the directory") + + self.assertTrue(self.file_io.try_to_write_atomic(normal_file, "test content")) + content = self.file_io.read_file_utf8(normal_file) + self.assertEqual(content, "test content") + +if __name__ == '__main__': + unittest.main() From 0de9949610a33fd5f8a81f9dd193aeb09deb41a2 Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Thu, 12 Mar 2026 14:50:41 +0800 Subject: [PATCH 2/5] Fix lint check --- .../filesystem/jindo_file_system_handler.py | 12 +- .../pypaimon/filesystem/pyarrow_file_io.py | 3 +- .../pypaimon/tests/jindo_file_system_test.py | 128 +++++++++--------- 3 files changed, 68 insertions(+), 75 deletions(-) diff --git a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py index 9fa98cc71454..aa732f471bd3 100644 --- a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py +++ b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py @@ -34,6 +34,7 @@ from pypaimon.common.options import Options from pypaimon.common.options.config import OssOptions + class JindoInputFile: def __init__(self, jindo_stream): self._stream = jindo_stream @@ -117,6 +118,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): return False + class JindoFileSystemHandler(FileSystemHandler): def __init__(self, root_path: str, catalog_options: Options): if not JINDO_AVAILABLE: @@ -205,7 +207,7 @@ def get_file_info_selector(self, selector: FileSelector) -> list: try: items = self._jindo_fs.listdir(normalized, recursive=selector.recursive) return [self._convert_file_info(item) for item in items] - except FileNotFoundError as e: + except FileNotFoundError: if selector.allow_not_found: return [] raise @@ -256,13 +258,6 @@ def copy_file(self, src: str, dest: str): src_norm = self._normalize_path(src) dst_norm = self._normalize_path(dest) self._jindo_fs.copy_file(src_norm, dst_norm) - # with self._jindo_fs.open(src_norm, "rb") as src_f: - # with self._jindo_fs.open(dst_norm, "wb") as dst_f: - # while True: - # chunk = src_f.read(1024 * 1024) # 1 MB - # if not chunk: - # break - # dst_f.write(chunk) def open_input_stream(self, path: str): normalized = self._normalize_path(path) @@ -284,4 +279,3 @@ def open_append_stream(self, path: str, metadata): def normalize_path(self, path: str) -> str: return self._normalize_path(path) - diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index 384cbe7b6c6a..d5cb8f7b6f66 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -47,6 +47,7 @@ JINDO_AVAILABLE = False JindoFileSystemHandler = None + def _pyarrow_lt_7(): return parse(pyarrow.__version__) < parse("7.0.0") @@ -569,7 +570,7 @@ def to_filesystem_path(self, path: str) -> str: # For JindoFileSystem, pass key only path_part = normalized_path.lstrip('/') return path_part if path_part else '.' - + if isinstance(self.filesystem, S3FileSystem): if parsed.scheme: if parsed.netloc: diff --git a/paimon-python/pypaimon/tests/jindo_file_system_test.py b/paimon-python/pypaimon/tests/jindo_file_system_test.py index 4a8dce59c401..d2dc872d89ac 100644 --- a/paimon-python/pypaimon/tests/jindo_file_system_test.py +++ b/paimon-python/pypaimon/tests/jindo_file_system_test.py @@ -16,8 +16,6 @@ # limitations under the License. ################################################################################ import os -import random -import time import unittest import uuid @@ -28,6 +26,7 @@ from pypaimon.common.options.config import OssOptions from pypaimon.filesystem.jindo_file_system_handler import JindoFileSystemHandler, JINDO_AVAILABLE + class JindoFileSystemTest(unittest.TestCase): """Test cases for JindoFileSystem.""" @@ -35,7 +34,7 @@ def setUp(self): """Set up test fixtures.""" if not JINDO_AVAILABLE: self.skipTest("pyjindo is not available") - + # Get OSS credentials from environment variables or use test values bucket = os.environ.get("OSS_TEST_BUCKET") access_key_id = os.environ.get("OSS_ACCESS_KEY_ID") @@ -54,18 +53,17 @@ def setUp(self): self.skipTest("test endpoint is not configured") return self.root_path = f"oss://{bucket}/" - - + self.catalog_options = Options({ OssOptions.OSS_ACCESS_KEY_ID.key(): access_key_id, OssOptions.OSS_ACCESS_KEY_SECRET.key(): access_key_secret, OssOptions.OSS_ENDPOINT.key(): endpoint, }) - + # Create JindoFileSystemHandler instance fs_handler = JindoFileSystemHandler(self.root_path, self.catalog_options) self.fs = PyFileSystem(fs_handler) - + # Create unique test prefix to avoid conflicts self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/" @@ -91,7 +89,7 @@ def test_get_root_file_info(self): """Test get_file_info for root path.""" # Verify directory exists using get_file_info file_info = self.fs.get_file_info(self.root_path) - + # Verify the returned file info self.assertIsInstance(file_info, pafs.FileInfo) self.assertEqual(file_info.type, pafs.FileType.Directory) @@ -99,10 +97,10 @@ def test_get_root_file_info(self): def test_create_dir_recursive(self): """Test create_dir with recursive=True.""" test_dir = self._get_test_path("nested/deep/dir/") - + # Create nested directory self.fs.create_dir(test_dir, recursive=True) - + # Verify directory exists file_info = self.fs.get_file_info(test_dir) self.assertEqual(file_info.type, pafs.FileType.Directory) @@ -111,20 +109,20 @@ def test_write_and_read_small_file(self): """Test writing and reading a small file (10 bytes) with random data.""" test_file = self._get_test_path("small-file.txt") test_data = os.urandom(10) - + # Write file out_stream = self.fs.open_output_stream(test_file) out_stream.write(test_data) out_stream.close() - + # Read file with self.fs.open_input_file(test_file) as in_file: read_data = in_file.read() - + # Verify data correctness self.assertEqual(test_data, read_data) self.assertEqual(len(read_data), 10) - + # Verify file info file_info = self.fs.get_file_info(test_file) self.assertEqual(file_info.type, pafs.FileType.File) @@ -135,7 +133,7 @@ def test_write_and_read_large_file(self): test_file = self._get_test_path("large-file.bin") file_size = 20 * 1024 * 1024 # 20MB test_data = os.urandom(file_size) - + # Write file in chunks chunk_size = 1024 * 1024 # 1MB chunks out_stream = self.fs.open_output_stream(test_file) @@ -143,53 +141,53 @@ def test_write_and_read_large_file(self): chunk = test_data[i:i + chunk_size] out_stream.write(chunk) out_stream.close() - + # Verify file info file_info = self.fs.get_file_info(test_file) self.assertEqual(file_info.type, pafs.FileType.File) self.assertEqual(file_info.size, file_size) - + # Test JindoInputFile all methods with self.fs.open_input_file(test_file) as in_file: # Test tell() - should be at position 0 initially self.assertEqual(in_file.tell(), 0) - + # Test read() with specific size chunk1 = in_file.read(1024) self.assertEqual(len(chunk1), 1024) self.assertEqual(in_file.tell(), 1024) self.assertEqual(chunk1, test_data[0:1024]) - + # Test seek() - seek to middle of file middle_pos = file_size // 2 in_file.seek(middle_pos) self.assertEqual(in_file.tell(), middle_pos) - + # Test read_at() read_at_data = in_file.read_at(1024, 0) self.assertEqual(len(read_at_data), 1024) self.assertEqual(read_at_data, test_data[0:1024]) - + # Test read_at() at different offset offset = file_size - 2048 read_at_data2 = in_file.read_at(1024, offset) self.assertEqual(len(read_at_data2), 1024) self.assertEqual(read_at_data2, test_data[offset:offset + 1024]) - + # Test seek() to end in_file.seek(file_size) self.assertEqual(in_file.tell(), file_size) - + # Test read() at end - should return empty bytes empty_data = in_file.read(100) self.assertEqual(len(empty_data), 0) - + # Test read() without size - read remaining in_file.seek(0) all_data = in_file.read() self.assertEqual(len(all_data), file_size) self.assertEqual(all_data, test_data) - + # Verify complete file read with self.fs.open_input_file(test_file) as in_file: complete_data = in_file.read() @@ -200,12 +198,12 @@ def test_get_file_info_single_path(self): """Test get_file_info with single path.""" test_file = self._get_test_path("info-test.txt") test_data = b"test content" - + # Write file out_stream = self.fs.open_output_stream(test_file) out_stream.write(test_data) out_stream.close() - + # Get file info file_info = self.fs.get_file_info(test_file) self.assertIsInstance(file_info, pafs.FileInfo) @@ -216,13 +214,13 @@ def test_get_file_info_list(self): """Test get_file_info with list of paths.""" test_file = self._get_test_path("info-test1.txt") test_dir = self._get_test_path("dir1") - + # Write files out_stream = self.fs.open_output_stream(test_file) out_stream.write(b"content1") out_stream.close() self.fs.create_dir(test_dir) - + # Get file info for list file_infos = self.fs.get_file_info([test_file, test_dir]) self.assertIsInstance(file_infos, list) @@ -237,7 +235,7 @@ def test_get_file_info_with_selector(self): test_dir = self._get_test_path("selector-test/") test_file1 = self._get_test_path("selector-test/file1.txt") test_file2 = self._get_test_path("selector-test/file2.txt") - + # Create files self.fs.create_dir(test_dir) out_stream1 = self.fs.open_output_stream(test_file1) @@ -246,13 +244,13 @@ def test_get_file_info_with_selector(self): out_stream2 = self.fs.open_output_stream(test_file2) out_stream2.write(b"content2") out_stream2.close() - + # Test non-recursive listing selector = pafs.FileSelector(test_dir, recursive=False, allow_not_found=False) file_infos = self.fs.get_file_info(selector) self.assertIsInstance(file_infos, list) self.assertEqual(len(file_infos), 2) - + # Verify we got the files file_names = [info.path for info in file_infos] self.assertTrue(any("file1.txt" in name for name in file_names)) @@ -264,7 +262,7 @@ def test_get_file_info_with_selector_recursive(self): test_subdir = self._get_test_path("selector-recursive/subdir/") test_file1 = self._get_test_path("selector-recursive/file1.txt") test_file2 = self._get_test_path("selector-recursive/subdir/file2.txt") - + # Create directory structure self.fs.create_dir(test_dir) self.fs.create_dir(test_subdir) @@ -274,7 +272,7 @@ def test_get_file_info_with_selector_recursive(self): out_stream2 = self.fs.open_output_stream(test_file2) out_stream2.write(b"content2") out_stream2.close() - + # Test recursive listing selector = pafs.FileSelector(test_dir, recursive=True, allow_not_found=False) file_infos = self.fs.get_file_info(selector) @@ -287,7 +285,7 @@ def test_get_file_info_not_found(self): file_info = self.fs.get_file_info(non_existent) self.assertEqual(file_info.type, pafs.FileType.NotFound) - + # Try to open non-existent file should raise FileNotFoundError with self.assertRaises(FileNotFoundError): with self.fs.open_input_file(non_existent) as in_file: @@ -296,7 +294,7 @@ def test_get_file_info_not_found(self): def test_get_file_info_selector_allow_not_found(self): """Test get_file_info with FileSelector allow_not_found=True.""" non_existent_dir = self._get_test_path("non-existent-dir/") - + # Test with allow_not_found=True selector = pafs.FileSelector(non_existent_dir, recursive=False, allow_not_found=True) file_infos = self.fs.get_file_info(selector) @@ -306,19 +304,19 @@ def test_get_file_info_selector_allow_not_found(self): def test_delete_file(self): """Test delete_file method.""" test_file = self._get_test_path("delete-test.txt") - + # Create file out_stream = self.fs.open_output_stream(test_file) out_stream.write(b"test content") out_stream.close() - + # Verify file exists file_info = self.fs.get_file_info(test_file) self.assertEqual(file_info.type, pafs.FileType.File) - + # Delete file self.fs.delete_file(test_file) - + # Verify file is deleted - should raise FileNotFoundError when accessing with self.assertRaises(FileNotFoundError): with self.fs.open_input_file(test_file) as in_file: @@ -327,17 +325,17 @@ def test_delete_file(self): def test_delete_dir(self): """Test delete_dir method.""" test_dir = self._get_test_path("delete-dir/") - + # Create directory self.fs.create_dir(test_dir) - + # Verify directory exists file_info = self.fs.get_file_info(test_dir) self.assertEqual(file_info.type, pafs.FileType.Directory) - + # Delete directory self.fs.delete_dir(test_dir) - + # Verify directory is deleted - should raise FileNotFoundError when accessing with self.assertRaises(FileNotFoundError): # Try to list directory contents @@ -351,7 +349,7 @@ def test_delete_dir_contents(self): test_file2 = self._get_test_path("delete-contents/file2.txt") test_subdir = self._get_test_path("delete-contents/subdir/") test_file3 = self._get_test_path("delete-contents/subdir/file3.txt") - + # Create directory structure self.fs.create_dir(test_dir) out_stream1 = self.fs.open_output_stream(test_file1) @@ -364,10 +362,10 @@ def test_delete_dir_contents(self): out_stream3 = self.fs.open_output_stream(test_file3) out_stream3.write(b"content3") out_stream3.close() - + # Delete directory contents self.fs.delete_dir_contents(test_dir) - + # Verify directory still exists but is empty file_info = self.fs.get_file_info(test_dir) self.assertEqual(file_info.type, pafs.FileType.Directory) @@ -391,24 +389,24 @@ def test_move_file(self): src_file = self._get_test_path("move-src.txt") dst_file = self._get_test_path("move-dst.txt") test_data = b"move test content" - + # Create source file out_stream = self.fs.open_output_stream(src_file) out_stream.write(test_data) out_stream.close() - + # Move file self.fs.move(src_file, dst_file) - + # Verify source is gone - should raise FileNotFoundError when accessing with self.assertRaises(FileNotFoundError): with self.fs.open_input_file(src_file) as in_file: in_file.read() - + # Verify destination exists with correct content dst_info = self.fs.get_file_info(dst_file) self.assertEqual(dst_info.type, pafs.FileType.File) - + with self.fs.open_input_file(dst_file) as in_file: read_data = in_file.read() self.assertEqual(read_data, test_data) @@ -418,22 +416,22 @@ def test_move_directory(self): src_dir = self._get_test_path("move-src-dir/") dst_dir = self._get_test_path("move-dst-dir/") src_file = self._get_test_path("move-src-dir/file.txt") - + # Create source directory and file self.fs.create_dir(src_dir) out_stream = self.fs.open_output_stream(src_file) out_stream.write(b"test content") out_stream.close() - + # Move directory self.fs.move(src_dir, dst_dir) - + # Verify source is gone - should raise FileNotFoundError when accessing with self.assertRaises(FileNotFoundError): # Try to list directory contents selector = pafs.FileSelector(src_dir, recursive=False, allow_not_found=False) self.fs.get_file_info(selector) - + # Verify destination exists dst_info = self.fs.get_file_info(dst_dir) self.assertEqual(dst_info.type, pafs.FileType.Directory) @@ -443,21 +441,21 @@ def test_copy_file(self): src_file = self._get_test_path("copy-src.txt") dst_file = self._get_test_path("copy-dst.txt") test_data = os.urandom(1024 * 1024) # 1MB random data - + # Create source file out_stream = self.fs.open_output_stream(src_file) out_stream.write(test_data) out_stream.close() - + # Copy file self.fs.copy_file(src_file, dst_file) - + # Verify both files exist src_info = self.fs.get_file_info(src_file) self.assertEqual(src_info.type, pafs.FileType.File) dst_info = self.fs.get_file_info(dst_file) self.assertEqual(dst_info.type, pafs.FileType.File) - + # Verify destination content matches source with self.fs.open_input_file(dst_file) as in_file: read_data = in_file.read() @@ -467,7 +465,7 @@ def test_copy_file(self): def test_delete_nonexistent_file(self): """Test deleting non-existent file.""" non_existent = self._get_test_path("non-existent-delete.txt") - + # Try to delete non-existent file with self.assertRaises(IOError): self.fs.delete_file(non_existent) @@ -476,25 +474,25 @@ def test_multiple_sequential_reads(self): """Test multiple sequential reads from same file.""" test_file = self._get_test_path("sequential-read.txt") test_data = os.urandom(10000) # 10KB - + # Write file out_stream = self.fs.open_output_stream(test_file) out_stream.write(test_data) out_stream.close() - + # Read in chunks sequentially with self.fs.open_input_file(test_file) as in_file: chunk1 = in_file.read(1000) chunk2 = in_file.read(2000) chunk3 = in_file.read(3000) chunk4 = in_file.read() # Read rest - + # Verify all chunks self.assertEqual(chunk1, test_data[0:1000]) self.assertEqual(chunk2, test_data[1000:3000]) self.assertEqual(chunk3, test_data[3000:6000]) self.assertEqual(chunk4, test_data[6000:]) - + # Verify total length total_read = len(chunk1) + len(chunk2) + len(chunk3) + len(chunk4) self.assertEqual(total_read, len(test_data)) From 02f56d88f700f891a3263e5f3bbe2e37e04e2b9c Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Thu, 12 Mar 2026 15:09:18 +0800 Subject: [PATCH 3/5] Refactor some comments --- paimon-python/pypaimon/filesystem/jindo_file_system_handler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py index aa732f471bd3..2bee1217e2ac 100644 --- a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py +++ b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py @@ -128,7 +128,7 @@ def __init__(self, root_path: str, catalog_options: Options): self.root_path = root_path self.properties = catalog_options - # 从 catalog_options 构建 jindo 配置 + # Build jindo config from catalog_options config = jutil.Config() access_key_id = catalog_options.get(OssOptions.OSS_ACCESS_KEY_ID) From e90b3eaf3841d262dd74c5b274e34660c3cd187f Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Wed, 18 Mar 2026 11:30:30 +0800 Subject: [PATCH 4/5] Add config to decide which oss impl to use --- .../pypaimon/common/options/config.py | 2 + .../pypaimon/filesystem/pyarrow_file_io.py | 10 ++-- paimon-python/pypaimon/tests/file_io_test.py | 46 +++++++++---------- .../pypaimon/tests/oss_file_io_test.py | 4 +- 4 files changed, 32 insertions(+), 30 deletions(-) diff --git a/paimon-python/pypaimon/common/options/config.py b/paimon-python/pypaimon/common/options/config.py index fb6a46446ea0..8b0230da64cb 100644 --- a/paimon-python/pypaimon/common/options/config.py +++ b/paimon-python/pypaimon/common/options/config.py @@ -18,6 +18,8 @@ class OssOptions: + OSS_IMPL = ConfigOptions.key("fs.oss.impl").string_type().default_value("default").with_description( + "OSS filesystem implementation: default or jindo") OSS_ACCESS_KEY_ID = ConfigOptions.key("fs.oss.accessKeyId").string_type().no_default_value().with_description( "OSS access key ID") OSS_ACCESS_KEY_SECRET = ConfigOptions.key( diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index d5cb8f7b6f66..77479ab691b9 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -62,11 +62,11 @@ def __init__(self, path: str, catalog_options: Options): self.uri_reader_factory = UriReaderFactory(catalog_options) self._is_oss = scheme in {"oss"} self._oss_bucket = None - self.use_jindo_fs = False + self._oss_impl = self.properties.get(OssOptions.OSS_IMPL) if self._is_oss: self._oss_bucket = self._extract_oss_bucket(path) # Try to use JindoFileSystem if available, otherwise fall back to S3FileSystem - if JINDO_AVAILABLE: + if self._oss_impl == "jindo" and JINDO_AVAILABLE: self.filesystem = self._initialize_jindo_fs(path) else: self.filesystem = self._initialize_oss_fs(path) @@ -129,9 +129,9 @@ def _extract_oss_bucket(self, location) -> str: def _initialize_jindo_fs(self, path) -> FileSystem: """Initialize JindoFileSystem for OSS access.""" + self.logger.info(f"Initializing JindoFileSystem for OSS access: {path}") root_path = f"oss://{self._oss_bucket}/" fs_handler = JindoFileSystemHandler(root_path, self.properties) - self.use_jindo_fs = True return pafs.PyFileSystem(fs_handler) def _initialize_oss_fs(self, path) -> FileSystem: @@ -202,7 +202,7 @@ def new_input_stream(self, path: str): def new_output_stream(self, path: str): path_str = self.to_filesystem_path(path) - if self.use_jindo_fs: + if self._oss_impl == "jindo": pass elif self._is_oss and not self._pyarrow_gte_7: # For PyArrow 6.x + OSS, path_str is already just the key part @@ -566,7 +566,7 @@ def to_filesystem_path(self, path: str) -> str: path_part = normalized_path.lstrip('/') return f"{drive_letter}:/{path_part}" if path_part else f"{drive_letter}:" - if self.use_jindo_fs: + if self._oss_impl == "jindo": # For JindoFileSystem, pass key only path_part = normalized_path.lstrip('/') return path_part if path_part else '.' diff --git a/paimon-python/pypaimon/tests/file_io_test.py b/paimon-python/pypaimon/tests/file_io_test.py index 52d93d176eba..5cc4d7a82164 100644 --- a/paimon-python/pypaimon/tests/file_io_test.py +++ b/paimon-python/pypaimon/tests/file_io_test.py @@ -71,10 +71,9 @@ def test_filesystem_path_conversion(self): OssOptions.OSS_ACCESS_KEY_ID.key(): 'test-key', OssOptions.OSS_ACCESS_KEY_SECRET.key(): 'test-secret', })) - use_jindo_fs = oss_io.use_jindo_fs got = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") - self.assertEqual(got, "path/to/file.txt" if lt7 or use_jindo_fs else "test-bucket/path/to/file.txt") - if lt7 or use_jindo_fs: + self.assertEqual(got, "path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt") + if lt7: self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx/data.parquet"), "db-xxx.db/tbl-xxx/data.parquet") self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx"), "db-xxx.db/tbl-xxx") @@ -91,27 +90,26 @@ def record_get_file_info(paths): get_file_info_calls.append(list(paths)) return [MagicMock(type=pafs.FileType.NotFound) for _ in paths] - if not use_jindo_fs: - mock_fs = MagicMock() - mock_fs.get_file_info.side_effect = record_get_file_info if lt7 else [[nf], [nf]] - mock_fs.create_dir = MagicMock() - mock_fs.open_output_stream.return_value = MagicMock() - oss_io.filesystem = mock_fs - oss_io.new_output_stream("oss://test-bucket/path/to/file.txt") - mock_fs.create_dir.assert_called_once() - path_str = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") - if lt7: - expected_parent = '/'.join(path_str.split('/')[:-1]) if '/' in path_str else '' - else: - expected_parent = "/".join(path_str.split("/")[:-1]) if "/" in path_str else str(Path(path_str).parent) - self.assertEqual(mock_fs.create_dir.call_args[0][0], expected_parent) - if lt7: - for call_paths in get_file_info_calls: - for p in call_paths: - self.assertFalse( - p.startswith("test-bucket/"), - "OSS+PyArrow<7 must pass key only to get_file_info, not bucket/key. Got: %r" % (p,) - ) + mock_fs = MagicMock() + mock_fs.get_file_info.side_effect = record_get_file_info if lt7 else [[nf], [nf]] + mock_fs.create_dir = MagicMock() + mock_fs.open_output_stream.return_value = MagicMock() + oss_io.filesystem = mock_fs + oss_io.new_output_stream("oss://test-bucket/path/to/file.txt") + mock_fs.create_dir.assert_called_once() + path_str = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") + if lt7: + expected_parent = '/'.join(path_str.split('/')[:-1]) if '/' in path_str else '' + else: + expected_parent = "/".join(path_str.split("/")[:-1]) if "/" in path_str else str(Path(path_str).parent) + self.assertEqual(mock_fs.create_dir.call_args[0][0], expected_parent) + if lt7: + for call_paths in get_file_info_calls: + for p in call_paths: + self.assertFalse( + p.startswith("test-bucket/"), + "OSS+PyArrow<7 must pass key only to get_file_info, not bucket/key. Got: %r" % (p,) + ) def test_exists(self): lt7 = _pyarrow_lt_7() diff --git a/paimon-python/pypaimon/tests/oss_file_io_test.py b/paimon-python/pypaimon/tests/oss_file_io_test.py index f5f2642f2093..f9602c975ef8 100644 --- a/paimon-python/pypaimon/tests/oss_file_io_test.py +++ b/paimon-python/pypaimon/tests/oss_file_io_test.py @@ -36,7 +36,8 @@ def setUp(self): access_key_id = os.environ.get("OSS_ACCESS_KEY_ID") access_key_secret = os.environ.get("OSS_ACCESS_KEY_SECRET") endpoint = os.environ.get("OSS_ENDPOINT") - + oss_impl = os.environ.get("OSS_IMPL", "default") + if not self.bucket: self.skipTest("test bucket is not configured") return @@ -56,6 +57,7 @@ def setUp(self): OssOptions.OSS_ACCESS_KEY_ID.key(): access_key_id, OssOptions.OSS_ACCESS_KEY_SECRET.key(): access_key_secret, OssOptions.OSS_ENDPOINT.key(): endpoint, + OssOptions.OSS_IMPL.key(): oss_impl, }) # Create PyArrowFileIO instance From 97902e1be8840cdb485f64c0f3ab1845c6604e09 Mon Sep 17 00:00:00 2001 From: "shunyang.ysy" Date: Wed, 18 Mar 2026 14:42:26 +0800 Subject: [PATCH 5/5] Refactor jindo output file exit --- .../filesystem/jindo_file_system_handler.py | 1 + .../pypaimon/tests/jindo_file_system_test.py | 22 +++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py index 2bee1217e2ac..0c9768e08e0a 100644 --- a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py +++ b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py @@ -116,6 +116,7 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): + self.close() return False diff --git a/paimon-python/pypaimon/tests/jindo_file_system_test.py b/paimon-python/pypaimon/tests/jindo_file_system_test.py index d2dc872d89ac..b6f168ac5823 100644 --- a/paimon-python/pypaimon/tests/jindo_file_system_test.py +++ b/paimon-python/pypaimon/tests/jindo_file_system_test.py @@ -128,6 +128,28 @@ def test_write_and_read_small_file(self): self.assertEqual(file_info.type, pafs.FileType.File) self.assertEqual(file_info.size, 10) + def test_write_and_read_small_file_with_context(self): + """Test writing and reading a small file (10 bytes) with random data using context manager.""" + test_file = self._get_test_path("small-file-with-context.txt") + test_data = os.urandom(10) + + # Write file using context manager + with self.fs.open_output_stream(test_file) as out_stream: + out_stream.write(test_data) + + # Read file + with self.fs.open_input_file(test_file) as in_file: + read_data = in_file.read() + + # Verify data correctness + self.assertEqual(test_data, read_data) + self.assertEqual(len(read_data), 10) + + # Verify file info + file_info = self.fs.get_file_info(test_file) + self.assertEqual(file_info.type, pafs.FileType.File) + self.assertEqual(file_info.size, 10) + def test_write_and_read_large_file(self): """Test writing and reading a large file (20MB) with random data.""" test_file = self._get_test_path("large-file.bin")