diff --git a/paimon-python/pypaimon/common/options/config.py b/paimon-python/pypaimon/common/options/config.py index fb6a46446ea0..8b0230da64cb 100644 --- a/paimon-python/pypaimon/common/options/config.py +++ b/paimon-python/pypaimon/common/options/config.py @@ -18,6 +18,8 @@ class OssOptions: + OSS_IMPL = ConfigOptions.key("fs.oss.impl").string_type().default_value("default").with_description( + "OSS filesystem implementation: default or jindo") OSS_ACCESS_KEY_ID = ConfigOptions.key("fs.oss.accessKeyId").string_type().no_default_value().with_description( "OSS access key ID") OSS_ACCESS_KEY_SECRET = ConfigOptions.key( diff --git a/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py new file mode 100644 index 000000000000..0c9768e08e0a --- /dev/null +++ b/paimon-python/pypaimon/filesystem/jindo_file_system_handler.py @@ -0,0 +1,282 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import logging + +import pyarrow as pa +from pyarrow import PythonFile +from pyarrow._fs import FileSystemHandler +from pyarrow.fs import FileInfo, FileSelector, FileType + +try: + import pyjindo.fs as jfs + import pyjindo.util as jutil + JINDO_AVAILABLE = True +except ImportError: + JINDO_AVAILABLE = False + jfs = None + jutil = None + +from pypaimon.common.options import Options +from pypaimon.common.options.config import OssOptions + + +class JindoInputFile: + def __init__(self, jindo_stream): + self._stream = jindo_stream + self._closed = False + + @property + def closed(self): + if hasattr(self._stream, 'closed'): + return self._stream.closed + return self._closed + + def read(self, nbytes: int = -1): + if self.closed: + raise ValueError("I/O operation on closed file") + if nbytes is None or nbytes < 0: + return self._stream.read() + return self._stream.read(nbytes) + + def seek(self, position: int, whence: int = 0): + if self.closed: + raise ValueError("I/O operation on closed file") + self._stream.seek(position, whence) + + def tell(self) -> int: + if self.closed: + raise ValueError("I/O operation on closed file") + return self._stream.tell() + + def read_at(self, nbytes: int, offset: int): + if self.closed: + raise ValueError("I/O operation on closed file") + return self._stream.pread(nbytes, offset) + + def close(self): + if not self._closed: + self._stream.close() + self._closed = True + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return False + + +class JindoOutputFile: + def __init__(self, jindo_stream): + self._stream = jindo_stream + self._closed = False + + @property + def closed(self): + if hasattr(self._stream, 'closed'): + return self._stream.closed + return self._closed + + def write(self, data: bytes) -> int: + if self.closed: + raise ValueError("I/O operation on closed file") + if isinstance(data, pa.Buffer): + data = data.to_pybytes() + elif not isinstance(data, bytes): + raise TypeError("Unsupported data type") + return self._stream.write(data) + + def flush(self): + if self.closed: + raise ValueError("I/O operation on closed file") + if hasattr(self._stream, 'flush'): + self._stream.flush() + + def close(self): + if not self._closed: + self._stream.close() + self._closed = True + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + return False + + +class JindoFileSystemHandler(FileSystemHandler): + def __init__(self, root_path: str, catalog_options: Options): + if not JINDO_AVAILABLE: + raise ImportError("pyjindo is not available. Please install pyjindo.") + + self.logger = logging.getLogger(__name__) + self.root_path = root_path + self.properties = catalog_options + + # Build jindo config from catalog_options + config = jutil.Config() + + access_key_id = catalog_options.get(OssOptions.OSS_ACCESS_KEY_ID) + access_key_secret = catalog_options.get(OssOptions.OSS_ACCESS_KEY_SECRET) + security_token = catalog_options.get(OssOptions.OSS_SECURITY_TOKEN) + endpoint = catalog_options.get(OssOptions.OSS_ENDPOINT) + region = catalog_options.get(OssOptions.OSS_REGION) + + if access_key_id: + config.set("fs.oss.accessKeyId", access_key_id) + if access_key_secret: + config.set("fs.oss.accessKeySecret", access_key_secret) + if security_token: + config.set("fs.oss.securityToken", security_token) + if endpoint: + endpoint_clean = endpoint.replace('http://', '').replace('https://', '') + config.set("fs.oss.endpoint", endpoint_clean) + if region: + config.set("fs.oss.region", region) + + self._jindo_fs = jfs.connect(self.root_path, "root", config) + + def __eq__(self, other): + if isinstance(other, JindoFileSystemHandler): + return self.root_path == other.root_path + return NotImplemented + + def __ne__(self, other): + if isinstance(other, JindoFileSystemHandler): + return not self.__eq__(other) + return NotImplemented + + def _normalize_path(self, path: str) -> str: + if path.startswith('oss://'): + return path + + if not path or path == '.': + return self.root_path.rstrip('/') + '/' + + path_clean = path.lstrip('/') + return self.root_path.rstrip('/') + '/' + path_clean + + def _convert_file_type(self, jindo_type) -> FileType: + if jindo_type == jfs.FileType.File: + return FileType.File + elif jindo_type == jfs.FileType.Directory: + return FileType.Directory + else: + return FileType.Unknown + + def _convert_file_info(self, jindo_info) -> FileInfo: + pa_type = self._convert_file_type(jindo_info.type) + return FileInfo( + path=jindo_info.path, + type=pa_type, + size=jindo_info.size if jindo_info.type == jfs.FileType.File else None, + mtime=jindo_info.mtime if hasattr(jindo_info, 'mtime') else None, + ) + + def get_type_name(self) -> str: + return "jindo" + + def get_file_info(self, paths) -> list: + infos = [] + for path in paths: + normalized = self._normalize_path(path) + try: + jindo_info = self._jindo_fs.get_file_info(normalized) + infos.append(self._convert_file_info(jindo_info)) + except FileNotFoundError: + infos.append(FileInfo(normalized, FileType.NotFound)) + return infos + + def get_file_info_selector(self, selector: FileSelector) -> list: + normalized = self._normalize_path(selector.base_dir) + try: + items = self._jindo_fs.listdir(normalized, recursive=selector.recursive) + return [self._convert_file_info(item) for item in items] + except FileNotFoundError: + if selector.allow_not_found: + return [] + raise + + def create_dir(self, path: str, recursive: bool): + normalized = self._normalize_path(path) + self._jindo_fs.mkdir(normalized) + + def delete_dir(self, path: str): + normalized = self._normalize_path(path) + self._jindo_fs.remove(normalized) + + def delete_dir_contents(self, path: str, missing_dir_ok: bool = False): + normalized = self._normalize_path(path) + if normalized == self.root_path: + raise ValueError( + "delete_dir_contents() does not accept root path" + ) + self._delete_dir_contents(path, missing_dir_ok) + + def delete_root_dir_contents(self): + self._delete_dir_contents("/", missing_dir_ok=False) + + def _delete_dir_contents(self, path: str, missing_dir_ok: bool): + normalized = self._normalize_path(path) + try: + items = self._jindo_fs.listdir(normalized, recursive=False) + except FileNotFoundError: + if missing_dir_ok: + return + raise + except Exception as e: + self.logger.warning(f"Error listing {path}: {e}") + raise + for item in items: + self._jindo_fs.remove(item.path) + + def delete_file(self, path: str): + normalized = self._normalize_path(path) + self._jindo_fs.remove(normalized) + + def move(self, src: str, dest: str): + src_norm = self._normalize_path(src) + dst_norm = self._normalize_path(dest) + self._jindo_fs.rename(src_norm, dst_norm) + + def copy_file(self, src: str, dest: str): + src_norm = self._normalize_path(src) + dst_norm = self._normalize_path(dest) + self._jindo_fs.copy_file(src_norm, dst_norm) + + def open_input_stream(self, path: str): + normalized = self._normalize_path(path) + jindo_stream = self._jindo_fs.open(normalized, "rb") + return PythonFile(JindoInputFile(jindo_stream), mode="r") + + def open_input_file(self, path: str): + normalized = self._normalize_path(path) + jindo_stream = self._jindo_fs.open(normalized, "rb") + return PythonFile(JindoInputFile(jindo_stream), mode="r") + + def open_output_stream(self, path: str, metadata): + normalized = self._normalize_path(path) + jindo_stream = self._jindo_fs.open(normalized, "wb") + return PythonFile(JindoOutputFile(jindo_stream), mode="w") + + def open_append_stream(self, path: str, metadata): + raise IOError("append mode is not supported") + + def normalize_path(self, path: str) -> str: + return self._normalize_path(path) diff --git a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py index c4b64445f701..77479ab691b9 100644 --- a/paimon-python/pypaimon/filesystem/pyarrow_file_io.py +++ b/paimon-python/pypaimon/filesystem/pyarrow_file_io.py @@ -41,6 +41,12 @@ from pypaimon.table.row.row_kind import RowKind from pypaimon.write.blob_format_writer import BlobFormatWriter +try: + from pypaimon.filesystem.jindo_file_system_handler import JindoFileSystemHandler, JINDO_AVAILABLE +except ImportError: + JINDO_AVAILABLE = False + JindoFileSystemHandler = None + def _pyarrow_lt_7(): return parse(pyarrow.__version__) < parse("7.0.0") @@ -56,9 +62,14 @@ def __init__(self, path: str, catalog_options: Options): self.uri_reader_factory = UriReaderFactory(catalog_options) self._is_oss = scheme in {"oss"} self._oss_bucket = None + self._oss_impl = self.properties.get(OssOptions.OSS_IMPL) if self._is_oss: self._oss_bucket = self._extract_oss_bucket(path) - self.filesystem = self._initialize_oss_fs(path) + # Try to use JindoFileSystem if available, otherwise fall back to S3FileSystem + if self._oss_impl == "jindo" and JINDO_AVAILABLE: + self.filesystem = self._initialize_jindo_fs(path) + else: + self.filesystem = self._initialize_oss_fs(path) elif scheme in {"s3", "s3a", "s3n"}: self.filesystem = self._initialize_s3_fs() elif scheme in {"hdfs", "viewfs"}: @@ -116,6 +127,13 @@ def _extract_oss_bucket(self, location) -> str: raise ValueError("Invalid OSS URI without bucket: {}".format(location)) return bucket + def _initialize_jindo_fs(self, path) -> FileSystem: + """Initialize JindoFileSystem for OSS access.""" + self.logger.info(f"Initializing JindoFileSystem for OSS access: {path}") + root_path = f"oss://{self._oss_bucket}/" + fs_handler = JindoFileSystemHandler(root_path, self.properties) + return pafs.PyFileSystem(fs_handler) + def _initialize_oss_fs(self, path) -> FileSystem: client_kwargs = { "access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID), @@ -184,7 +202,9 @@ def new_input_stream(self, path: str): def new_output_stream(self, path: str): path_str = self.to_filesystem_path(path) - if self._is_oss and not self._pyarrow_gte_7: + if self._oss_impl == "jindo": + pass + elif self._is_oss and not self._pyarrow_gte_7: # For PyArrow 6.x + OSS, path_str is already just the key part if '/' in path_str: parent_dir = '/'.join(path_str.split('/')[:-1]) @@ -546,6 +566,11 @@ def to_filesystem_path(self, path: str) -> str: path_part = normalized_path.lstrip('/') return f"{drive_letter}:/{path_part}" if path_part else f"{drive_letter}:" + if self._oss_impl == "jindo": + # For JindoFileSystem, pass key only + path_part = normalized_path.lstrip('/') + return path_part if path_part else '.' + if isinstance(self.filesystem, S3FileSystem): if parsed.scheme: if parsed.netloc: diff --git a/paimon-python/pypaimon/tests/file_io_test.py b/paimon-python/pypaimon/tests/file_io_test.py index ba91f94a18e6..5cc4d7a82164 100644 --- a/paimon-python/pypaimon/tests/file_io_test.py +++ b/paimon-python/pypaimon/tests/file_io_test.py @@ -67,7 +67,9 @@ def test_filesystem_path_conversion(self): lt7 = _pyarrow_lt_7() oss_io = PyArrowFileIO("oss://test-bucket/warehouse", Options({ - OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com' + OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com', + OssOptions.OSS_ACCESS_KEY_ID.key(): 'test-key', + OssOptions.OSS_ACCESS_KEY_SECRET.key(): 'test-secret', })) got = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt") self.assertEqual(got, "path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt") @@ -286,7 +288,9 @@ def test_exists_does_not_catch_exception(self): file_io.delete_directory_quietly("file:///some/path") oss_io = PyArrowFileIO("oss://test-bucket/warehouse", Options({ - OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com' + OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com', + OssOptions.OSS_ACCESS_KEY_ID.key(): 'test-key', + OssOptions.OSS_ACCESS_KEY_SECRET.key(): 'test-secret', })) mock_fs = MagicMock() mock_fs.get_file_info.return_value = [ diff --git a/paimon-python/pypaimon/tests/jindo_file_system_test.py b/paimon-python/pypaimon/tests/jindo_file_system_test.py new file mode 100644 index 000000000000..b6f168ac5823 --- /dev/null +++ b/paimon-python/pypaimon/tests/jindo_file_system_test.py @@ -0,0 +1,524 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os +import unittest +import uuid + +import pyarrow.fs as pafs + +from pyarrow.fs import PyFileSystem +from pypaimon.common.options import Options +from pypaimon.common.options.config import OssOptions +from pypaimon.filesystem.jindo_file_system_handler import JindoFileSystemHandler, JINDO_AVAILABLE + + +class JindoFileSystemTest(unittest.TestCase): + """Test cases for JindoFileSystem.""" + + def setUp(self): + """Set up test fixtures.""" + if not JINDO_AVAILABLE: + self.skipTest("pyjindo is not available") + + # Get OSS credentials from environment variables or use test values + bucket = os.environ.get("OSS_TEST_BUCKET") + access_key_id = os.environ.get("OSS_ACCESS_KEY_ID") + access_key_secret = os.environ.get("OSS_ACCESS_KEY_SECRET") + endpoint = os.environ.get("OSS_ENDPOINT") + if not bucket: + self.skipTest("test bucket is not configured") + return + if not access_key_id: + self.skipTest("test access key id is not configured") + return + if not access_key_secret: + self.skipTest("test access key secret is not configured") + return + if not endpoint: + self.skipTest("test endpoint is not configured") + return + self.root_path = f"oss://{bucket}/" + + self.catalog_options = Options({ + OssOptions.OSS_ACCESS_KEY_ID.key(): access_key_id, + OssOptions.OSS_ACCESS_KEY_SECRET.key(): access_key_secret, + OssOptions.OSS_ENDPOINT.key(): endpoint, + }) + + # Create JindoFileSystemHandler instance + fs_handler = JindoFileSystemHandler(self.root_path, self.catalog_options) + self.fs = PyFileSystem(fs_handler) + + # Create unique test prefix to avoid conflicts + self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/" + + def tearDown(self): + """Clean up test files and directories.""" + # Delete the entire test prefix directory + test_dir = f"{self.root_path}{self.test_prefix}" + try: + file_info = self.fs.get_file_info(test_dir) + if file_info.type == pafs.FileType.Directory: + try: + self.fs.delete_dir(test_dir) + except Exception: + pass + except Exception: + pass # Ignore cleanup errors + + def _get_test_path(self, name: str) -> str: + """Get a test path under test_prefix.""" + return f"{self.root_path}{self.test_prefix}{name}" + + def test_get_root_file_info(self): + """Test get_file_info for root path.""" + # Verify directory exists using get_file_info + file_info = self.fs.get_file_info(self.root_path) + + # Verify the returned file info + self.assertIsInstance(file_info, pafs.FileInfo) + self.assertEqual(file_info.type, pafs.FileType.Directory) + + def test_create_dir_recursive(self): + """Test create_dir with recursive=True.""" + test_dir = self._get_test_path("nested/deep/dir/") + + # Create nested directory + self.fs.create_dir(test_dir, recursive=True) + + # Verify directory exists + file_info = self.fs.get_file_info(test_dir) + self.assertEqual(file_info.type, pafs.FileType.Directory) + + def test_write_and_read_small_file(self): + """Test writing and reading a small file (10 bytes) with random data.""" + test_file = self._get_test_path("small-file.txt") + test_data = os.urandom(10) + + # Write file + out_stream = self.fs.open_output_stream(test_file) + out_stream.write(test_data) + out_stream.close() + + # Read file + with self.fs.open_input_file(test_file) as in_file: + read_data = in_file.read() + + # Verify data correctness + self.assertEqual(test_data, read_data) + self.assertEqual(len(read_data), 10) + + # Verify file info + file_info = self.fs.get_file_info(test_file) + self.assertEqual(file_info.type, pafs.FileType.File) + self.assertEqual(file_info.size, 10) + + def test_write_and_read_small_file_with_context(self): + """Test writing and reading a small file (10 bytes) with random data using context manager.""" + test_file = self._get_test_path("small-file-with-context.txt") + test_data = os.urandom(10) + + # Write file using context manager + with self.fs.open_output_stream(test_file) as out_stream: + out_stream.write(test_data) + + # Read file + with self.fs.open_input_file(test_file) as in_file: + read_data = in_file.read() + + # Verify data correctness + self.assertEqual(test_data, read_data) + self.assertEqual(len(read_data), 10) + + # Verify file info + file_info = self.fs.get_file_info(test_file) + self.assertEqual(file_info.type, pafs.FileType.File) + self.assertEqual(file_info.size, 10) + + def test_write_and_read_large_file(self): + """Test writing and reading a large file (20MB) with random data.""" + test_file = self._get_test_path("large-file.bin") + file_size = 20 * 1024 * 1024 # 20MB + test_data = os.urandom(file_size) + + # Write file in chunks + chunk_size = 1024 * 1024 # 1MB chunks + out_stream = self.fs.open_output_stream(test_file) + for i in range(0, len(test_data), chunk_size): + chunk = test_data[i:i + chunk_size] + out_stream.write(chunk) + out_stream.close() + + # Verify file info + file_info = self.fs.get_file_info(test_file) + self.assertEqual(file_info.type, pafs.FileType.File) + self.assertEqual(file_info.size, file_size) + + # Test JindoInputFile all methods + with self.fs.open_input_file(test_file) as in_file: + # Test tell() - should be at position 0 initially + self.assertEqual(in_file.tell(), 0) + + # Test read() with specific size + chunk1 = in_file.read(1024) + self.assertEqual(len(chunk1), 1024) + self.assertEqual(in_file.tell(), 1024) + self.assertEqual(chunk1, test_data[0:1024]) + + # Test seek() - seek to middle of file + middle_pos = file_size // 2 + in_file.seek(middle_pos) + self.assertEqual(in_file.tell(), middle_pos) + + # Test read_at() + read_at_data = in_file.read_at(1024, 0) + self.assertEqual(len(read_at_data), 1024) + self.assertEqual(read_at_data, test_data[0:1024]) + + # Test read_at() at different offset + offset = file_size - 2048 + read_at_data2 = in_file.read_at(1024, offset) + self.assertEqual(len(read_at_data2), 1024) + self.assertEqual(read_at_data2, test_data[offset:offset + 1024]) + + # Test seek() to end + in_file.seek(file_size) + self.assertEqual(in_file.tell(), file_size) + + # Test read() at end - should return empty bytes + empty_data = in_file.read(100) + self.assertEqual(len(empty_data), 0) + + # Test read() without size - read remaining + in_file.seek(0) + all_data = in_file.read() + self.assertEqual(len(all_data), file_size) + self.assertEqual(all_data, test_data) + + # Verify complete file read + with self.fs.open_input_file(test_file) as in_file: + complete_data = in_file.read() + self.assertEqual(complete_data, test_data) + self.assertEqual(len(complete_data), file_size) + + def test_get_file_info_single_path(self): + """Test get_file_info with single path.""" + test_file = self._get_test_path("info-test.txt") + test_data = b"test content" + + # Write file + out_stream = self.fs.open_output_stream(test_file) + out_stream.write(test_data) + out_stream.close() + + # Get file info + file_info = self.fs.get_file_info(test_file) + self.assertIsInstance(file_info, pafs.FileInfo) + self.assertEqual(file_info.type, pafs.FileType.File) + self.assertEqual(file_info.size, len(test_data)) + + def test_get_file_info_list(self): + """Test get_file_info with list of paths.""" + test_file = self._get_test_path("info-test1.txt") + test_dir = self._get_test_path("dir1") + + # Write files + out_stream = self.fs.open_output_stream(test_file) + out_stream.write(b"content1") + out_stream.close() + self.fs.create_dir(test_dir) + + # Get file info for list + file_infos = self.fs.get_file_info([test_file, test_dir]) + self.assertIsInstance(file_infos, list) + self.assertEqual(len(file_infos), 2) + self.assertEqual(file_infos[0].type, pafs.FileType.File) + self.assertTrue(test_file in file_infos[0].path) + self.assertEqual(file_infos[1].type, pafs.FileType.Directory) + self.assertTrue(test_dir in file_infos[1].path) + + def test_get_file_info_with_selector(self): + """Test get_file_info with FileSelector.""" + test_dir = self._get_test_path("selector-test/") + test_file1 = self._get_test_path("selector-test/file1.txt") + test_file2 = self._get_test_path("selector-test/file2.txt") + + # Create files + self.fs.create_dir(test_dir) + out_stream1 = self.fs.open_output_stream(test_file1) + out_stream1.write(b"content1") + out_stream1.close() + out_stream2 = self.fs.open_output_stream(test_file2) + out_stream2.write(b"content2") + out_stream2.close() + + # Test non-recursive listing + selector = pafs.FileSelector(test_dir, recursive=False, allow_not_found=False) + file_infos = self.fs.get_file_info(selector) + self.assertIsInstance(file_infos, list) + self.assertEqual(len(file_infos), 2) + + # Verify we got the files + file_names = [info.path for info in file_infos] + self.assertTrue(any("file1.txt" in name for name in file_names)) + self.assertTrue(any("file2.txt" in name for name in file_names)) + + def test_get_file_info_with_selector_recursive(self): + """Test get_file_info with FileSelector recursive=True.""" + test_dir = self._get_test_path("selector-recursive/") + test_subdir = self._get_test_path("selector-recursive/subdir/") + test_file1 = self._get_test_path("selector-recursive/file1.txt") + test_file2 = self._get_test_path("selector-recursive/subdir/file2.txt") + + # Create directory structure + self.fs.create_dir(test_dir) + self.fs.create_dir(test_subdir) + out_stream1 = self.fs.open_output_stream(test_file1) + out_stream1.write(b"content1") + out_stream1.close() + out_stream2 = self.fs.open_output_stream(test_file2) + out_stream2.write(b"content2") + out_stream2.close() + + # Test recursive listing + selector = pafs.FileSelector(test_dir, recursive=True, allow_not_found=False) + file_infos = self.fs.get_file_info(selector) + self.assertIsInstance(file_infos, list) + self.assertEqual(len(file_infos), 3) + + def test_get_file_info_not_found(self): + """Test get_file_info for non-existent file.""" + non_existent = self._get_test_path("non-existent-file.txt") + + file_info = self.fs.get_file_info(non_existent) + self.assertEqual(file_info.type, pafs.FileType.NotFound) + + # Try to open non-existent file should raise FileNotFoundError + with self.assertRaises(FileNotFoundError): + with self.fs.open_input_file(non_existent) as in_file: + in_file.read() + + def test_get_file_info_selector_allow_not_found(self): + """Test get_file_info with FileSelector allow_not_found=True.""" + non_existent_dir = self._get_test_path("non-existent-dir/") + + # Test with allow_not_found=True + selector = pafs.FileSelector(non_existent_dir, recursive=False, allow_not_found=True) + file_infos = self.fs.get_file_info(selector) + self.assertIsInstance(file_infos, list) + self.assertEqual(len(file_infos), 0) + + def test_delete_file(self): + """Test delete_file method.""" + test_file = self._get_test_path("delete-test.txt") + + # Create file + out_stream = self.fs.open_output_stream(test_file) + out_stream.write(b"test content") + out_stream.close() + + # Verify file exists + file_info = self.fs.get_file_info(test_file) + self.assertEqual(file_info.type, pafs.FileType.File) + + # Delete file + self.fs.delete_file(test_file) + + # Verify file is deleted - should raise FileNotFoundError when accessing + with self.assertRaises(FileNotFoundError): + with self.fs.open_input_file(test_file) as in_file: + in_file.read() + + def test_delete_dir(self): + """Test delete_dir method.""" + test_dir = self._get_test_path("delete-dir/") + + # Create directory + self.fs.create_dir(test_dir) + + # Verify directory exists + file_info = self.fs.get_file_info(test_dir) + self.assertEqual(file_info.type, pafs.FileType.Directory) + + # Delete directory + self.fs.delete_dir(test_dir) + + # Verify directory is deleted - should raise FileNotFoundError when accessing + with self.assertRaises(FileNotFoundError): + # Try to list directory contents + selector = pafs.FileSelector(test_dir, recursive=False, allow_not_found=False) + self.fs.get_file_info(selector) + + def test_delete_dir_contents(self): + """Test delete_dir_contents method.""" + test_dir = self._get_test_path("delete-contents/") + test_file1 = self._get_test_path("delete-contents/file1.txt") + test_file2 = self._get_test_path("delete-contents/file2.txt") + test_subdir = self._get_test_path("delete-contents/subdir/") + test_file3 = self._get_test_path("delete-contents/subdir/file3.txt") + + # Create directory structure + self.fs.create_dir(test_dir) + out_stream1 = self.fs.open_output_stream(test_file1) + out_stream1.write(b"content1") + out_stream1.close() + out_stream2 = self.fs.open_output_stream(test_file2) + out_stream2.write(b"content2") + out_stream2.close() + self.fs.create_dir(test_subdir) + out_stream3 = self.fs.open_output_stream(test_file3) + out_stream3.write(b"content3") + out_stream3.close() + + # Delete directory contents + self.fs.delete_dir_contents(test_dir) + + # Verify directory still exists but is empty + file_info = self.fs.get_file_info(test_dir) + self.assertEqual(file_info.type, pafs.FileType.Directory) + selector = pafs.FileSelector(test_dir, recursive=False, allow_not_found=False) + file_infos = self.fs.get_file_info(selector) + self.assertEqual(len(file_infos), 0) + + # Verify files are deleted - should raise FileNotFoundError when accessing + with self.assertRaises(FileNotFoundError): + with self.fs.open_input_file(test_file1) as in_file: + in_file.read() + with self.assertRaises(FileNotFoundError): + with self.fs.open_input_file(test_file2) as in_file: + in_file.read() + with self.assertRaises(FileNotFoundError): + with self.fs.open_input_file(test_file3) as in_file: + in_file.read() + + def test_move_file(self): + """Test move method for file.""" + src_file = self._get_test_path("move-src.txt") + dst_file = self._get_test_path("move-dst.txt") + test_data = b"move test content" + + # Create source file + out_stream = self.fs.open_output_stream(src_file) + out_stream.write(test_data) + out_stream.close() + + # Move file + self.fs.move(src_file, dst_file) + + # Verify source is gone - should raise FileNotFoundError when accessing + with self.assertRaises(FileNotFoundError): + with self.fs.open_input_file(src_file) as in_file: + in_file.read() + + # Verify destination exists with correct content + dst_info = self.fs.get_file_info(dst_file) + self.assertEqual(dst_info.type, pafs.FileType.File) + + with self.fs.open_input_file(dst_file) as in_file: + read_data = in_file.read() + self.assertEqual(read_data, test_data) + + def test_move_directory(self): + """Test move method for directory.""" + src_dir = self._get_test_path("move-src-dir/") + dst_dir = self._get_test_path("move-dst-dir/") + src_file = self._get_test_path("move-src-dir/file.txt") + + # Create source directory and file + self.fs.create_dir(src_dir) + out_stream = self.fs.open_output_stream(src_file) + out_stream.write(b"test content") + out_stream.close() + + # Move directory + self.fs.move(src_dir, dst_dir) + + # Verify source is gone - should raise FileNotFoundError when accessing + with self.assertRaises(FileNotFoundError): + # Try to list directory contents + selector = pafs.FileSelector(src_dir, recursive=False, allow_not_found=False) + self.fs.get_file_info(selector) + + # Verify destination exists + dst_info = self.fs.get_file_info(dst_dir) + self.assertEqual(dst_info.type, pafs.FileType.Directory) + + def test_copy_file(self): + """Test copy_file method.""" + src_file = self._get_test_path("copy-src.txt") + dst_file = self._get_test_path("copy-dst.txt") + test_data = os.urandom(1024 * 1024) # 1MB random data + + # Create source file + out_stream = self.fs.open_output_stream(src_file) + out_stream.write(test_data) + out_stream.close() + + # Copy file + self.fs.copy_file(src_file, dst_file) + + # Verify both files exist + src_info = self.fs.get_file_info(src_file) + self.assertEqual(src_info.type, pafs.FileType.File) + dst_info = self.fs.get_file_info(dst_file) + self.assertEqual(dst_info.type, pafs.FileType.File) + + # Verify destination content matches source + with self.fs.open_input_file(dst_file) as in_file: + read_data = in_file.read() + self.assertEqual(read_data, test_data) + self.assertEqual(len(read_data), len(test_data)) + + def test_delete_nonexistent_file(self): + """Test deleting non-existent file.""" + non_existent = self._get_test_path("non-existent-delete.txt") + + # Try to delete non-existent file + with self.assertRaises(IOError): + self.fs.delete_file(non_existent) + + def test_multiple_sequential_reads(self): + """Test multiple sequential reads from same file.""" + test_file = self._get_test_path("sequential-read.txt") + test_data = os.urandom(10000) # 10KB + + # Write file + out_stream = self.fs.open_output_stream(test_file) + out_stream.write(test_data) + out_stream.close() + + # Read in chunks sequentially + with self.fs.open_input_file(test_file) as in_file: + chunk1 = in_file.read(1000) + chunk2 = in_file.read(2000) + chunk3 = in_file.read(3000) + chunk4 = in_file.read() # Read rest + + # Verify all chunks + self.assertEqual(chunk1, test_data[0:1000]) + self.assertEqual(chunk2, test_data[1000:3000]) + self.assertEqual(chunk3, test_data[3000:6000]) + self.assertEqual(chunk4, test_data[6000:]) + + # Verify total length + total_read = len(chunk1) + len(chunk2) + len(chunk3) + len(chunk4) + self.assertEqual(total_read, len(test_data)) + + +if __name__ == '__main__': + unittest.main() diff --git a/paimon-python/pypaimon/tests/oss_file_io_test.py b/paimon-python/pypaimon/tests/oss_file_io_test.py new file mode 100644 index 000000000000..f9602c975ef8 --- /dev/null +++ b/paimon-python/pypaimon/tests/oss_file_io_test.py @@ -0,0 +1,362 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os +import unittest +import uuid + +import pyarrow.fs as pafs + +from pypaimon.common.options import Options +from pypaimon.common.options.config import OssOptions +from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO + + +class OSSFileIOTest(unittest.TestCase): + """Test cases for PyArrowFileIO with OSS.""" + + def setUp(self): + """Set up test fixtures.""" + # Get OSS credentials from environment variables + self.bucket = os.environ.get("OSS_TEST_BUCKET") + access_key_id = os.environ.get("OSS_ACCESS_KEY_ID") + access_key_secret = os.environ.get("OSS_ACCESS_KEY_SECRET") + endpoint = os.environ.get("OSS_ENDPOINT") + oss_impl = os.environ.get("OSS_IMPL", "default") + + if not self.bucket: + self.skipTest("test bucket is not configured") + return + if not access_key_id: + self.skipTest("test access key id is not configured") + return + if not access_key_secret: + self.skipTest("test access key secret is not configured") + return + if not endpoint: + self.skipTest("test endpoint is not configured") + return + + self.root_path = f"oss://{self.bucket}/" + + self.catalog_options = Options({ + OssOptions.OSS_ACCESS_KEY_ID.key(): access_key_id, + OssOptions.OSS_ACCESS_KEY_SECRET.key(): access_key_secret, + OssOptions.OSS_ENDPOINT.key(): endpoint, + OssOptions.OSS_IMPL.key(): oss_impl, + }) + + # Create PyArrowFileIO instance + self.file_io = PyArrowFileIO(self.root_path, self.catalog_options) + + # Create unique test prefix to avoid conflicts + self.test_prefix = f"test-{uuid.uuid4().hex[:8]}/" + + def tearDown(self): + """Clean up test files and directories.""" + # Delete the entire test prefix directory + test_dir = f"{self.root_path}{self.test_prefix}" + try: + if self.file_io.exists(test_dir): + self.file_io.delete(test_dir, recursive=True) + except Exception: + pass # Ignore cleanup errors + + def _get_test_path(self, name: str) -> str: + """Get a test path under test_prefix.""" + return f"{self.root_path}{self.test_prefix}{name}" + + def test_get_file_status_directory(self): + """Test get_file_status for a directory.""" + # Create a test directory + test_dir = self._get_test_path("test-dir/") + self.file_io.mkdirs(test_dir) + # Get file status + file_status = self.file_io.get_file_status(test_dir) + # Verify the returned file status + self.assertIsNotNone(file_status) + self.assertEqual(file_status.type, pafs.FileType.Directory) + + def test_new_input_stream_read(self): + """Test new_input_stream and read method.""" + # Create test data + test_data = b"Hello, World! This is a test file for OSS input stream." + test_file = self._get_test_path("test-input-stream.txt") + + # Write test data to file + with self.file_io.new_output_stream(test_file) as out_stream: + out_stream.write(test_data) + + # Test new_input_stream + input_stream = self.file_io.new_input_stream(test_file) + self.assertIsNotNone(input_stream) + + # Test read without nbytes (read all) + input_stream.seek(0) + read_data = input_stream.read() + self.assertEqual(read_data, test_data) + + # Test read with nbytes + input_stream.seek(0) + read_partial = input_stream.read(5) + self.assertEqual(read_partial, b"Hello") + + # Test read more bytes + read_partial2 = input_stream.read(7) + self.assertEqual(read_partial2, b", World") + + # Test read remaining + read_remaining = input_stream.read() + self.assertEqual(read_remaining, b"! This is a test file for OSS input stream.") + + # Verify complete data + input_stream.seek(0) + complete_data = input_stream.read() + self.assertEqual(complete_data, test_data) + + # Close the stream + input_stream.close() + + # Test context manager + with self.file_io.new_input_stream(test_file) as input_stream2: + data = input_stream2.read() + self.assertEqual(data, test_data) + + def test_new_input_stream_read_large_file(self): + """Test new_input_stream with larger file and read operations.""" + # Create larger test data (1MB) + test_data = b"X" * (1024 * 1024) + test_file = self._get_test_path("test-large-input-stream.bin") + + # Write test data + with self.file_io.new_output_stream(test_file) as out_stream: + out_stream.write(test_data) + + # Test reading in chunks + chunk_size = 64 * 1024 # 64KB chunks + with self.file_io.new_input_stream(test_file) as input_stream: + read_chunks = [] + while True: + chunk = input_stream.read(chunk_size) + if not chunk: + break + read_chunks.append(chunk) + + # Verify all data was read + read_data = b''.join(read_chunks) + self.assertEqual(len(read_data), len(test_data)) + self.assertEqual(read_data, test_data) + + # Test read_at method if available + with self.file_io.new_input_stream(test_file) as input_stream: + if hasattr(input_stream, 'read_at'): + # Read from middle of file + offset = len(test_data) // 2 + read_at_data = input_stream.read_at(1024, offset) + self.assertEqual(len(read_at_data), 1024) + self.assertEqual(read_at_data, test_data[offset:offset+1024]) + + def test_new_input_stream_file_not_found(self): + """Test new_input_stream with non-existent file.""" + non_existent_file = self._get_test_path("non-existent-file.txt") + + with self.assertRaises(FileNotFoundError): + self.file_io.new_input_stream(non_existent_file) + + def test_exists(self): + """Test exists method.""" + missing_uri = self._get_test_path("nonexistent_xyz") + self.assertFalse(self.file_io.exists(missing_uri)) + with self.assertRaises(FileNotFoundError): + self.file_io.get_file_status(missing_uri) + + def test_write_file_with_overwrite_flag(self): + """Test write_file with overwrite flag.""" + test_file_uri = self._get_test_path("overwrite_test.txt") + + # 1) Write to a new file with default overwrite=False + self.file_io.write_file(test_file_uri, "first content") + self.assertTrue(self.file_io.exists(test_file_uri)) + content = self.file_io.read_file_utf8(test_file_uri) + self.assertEqual(content, "first content") + + # 2) Attempt to write again with overwrite=False should raise FileExistsError + with self.assertRaises(FileExistsError): + self.file_io.write_file(test_file_uri, "second content", overwrite=False) + + # Ensure content is unchanged + content = self.file_io.read_file_utf8(test_file_uri) + self.assertEqual(content, "first content") + + # 3) Write with overwrite=True should replace the content + self.file_io.write_file(test_file_uri, "overwritten content", overwrite=True) + content = self.file_io.read_file_utf8(test_file_uri) + self.assertEqual(content, "overwritten content") + + def test_exists_does_not_catch_exception(self): + """Test that exists does not catch exceptions.""" + test_file = self._get_test_path("test_file.txt") + + # Write a test file + with self.file_io.new_output_stream(test_file) as out_stream: + out_stream.write(b"test") + self.assertTrue(self.file_io.exists(test_file)) + self.assertFalse(self.file_io.exists(self._get_test_path("nonexistent.txt"))) + + def test_delete_non_empty_directory_raises_error(self): + """Test that delete raises error for non-empty directory.""" + test_dir = self._get_test_path("test_dir/") + test_file = self._get_test_path("test_dir/test_file.txt") + + # Create directory and file + self.file_io.mkdirs(test_dir) + with self.file_io.new_output_stream(test_file) as out_stream: + out_stream.write(b"test") + + with self.assertRaises(OSError) as context: + self.file_io.delete(test_dir, recursive=False) + self.assertIn("is not empty", str(context.exception)) + + def test_delete_returns_false_when_file_not_exists(self): + """Test that delete returns False when file does not exist.""" + result = self.file_io.delete(self._get_test_path("nonexistent.txt")) + self.assertFalse(result, "delete() should return False when file does not exist") + + result = self.file_io.delete(self._get_test_path("nonexistent_dir"), recursive=False) + self.assertFalse(result, "delete() should return False when directory does not exist") + + def test_mkdirs_raises_error_when_path_is_file(self): + """Test that mkdirs raises error when path is a file.""" + test_file = self._get_test_path("test_file.txt") + + # Create a file + with self.file_io.new_output_stream(test_file) as out_stream: + out_stream.write(b"test") + + with self.assertRaises(FileExistsError) as context: + self.file_io.mkdirs(test_file) + self.assertIn("is not a directory", str(context.exception)) + + def test_rename_returns_false_when_dst_exists(self): + """Test that rename returns False when destination exists.""" + src_file = self._get_test_path("src.txt") + dst_file = self._get_test_path("dst.txt") + + # Create source and destination files + with self.file_io.new_output_stream(src_file) as out_stream: + out_stream.write(b"src") + with self.file_io.new_output_stream(dst_file) as out_stream: + out_stream.write(b"dst") + + result = self.file_io.rename(src_file, dst_file) + self.assertFalse(result) + + def test_get_file_status_raises_error_when_file_not_exists(self): + """Test that get_file_status raises error when file does not exist.""" + with self.assertRaises(FileNotFoundError) as context: + self.file_io.get_file_status(self._get_test_path("nonexistent.txt")) + self.assertIn("does not exist", str(context.exception)) + + test_file = self._get_test_path("test_file.txt") + with self.file_io.new_output_stream(test_file) as out_stream: + out_stream.write(b"test content") + + file_info = self.file_io.get_file_status(test_file) + self.assertEqual(file_info.type, pafs.FileType.File) + self.assertIsNotNone(file_info.size) + + with self.assertRaises(FileNotFoundError) as context: + self.file_io.get_file_size(self._get_test_path("nonexistent.txt")) + self.assertIn("does not exist", str(context.exception)) + + with self.assertRaises(FileNotFoundError) as context: + self.file_io.is_dir(self._get_test_path("nonexistent_dir")) + self.assertIn("does not exist", str(context.exception)) + + def test_copy_file(self): + """Test copy_file method.""" + source_file = self._get_test_path("source.txt") + target_file = self._get_test_path("target.txt") + + # Create source file + with self.file_io.new_output_stream(source_file) as out_stream: + out_stream.write(b"source content") + + # Test 1: Raises FileExistsError when target exists and overwrite=False + with self.file_io.new_output_stream(target_file) as out_stream: + out_stream.write(b"target content") + + with self.assertRaises(FileExistsError) as context: + self.file_io.copy_file(source_file, target_file, overwrite=False) + self.assertIn("already exists", str(context.exception)) + + # Verify target content unchanged + with self.file_io.new_input_stream(target_file) as in_stream: + content = in_stream.read() + self.assertEqual(content, b"target content") + + # Test 2: Overwrites when overwrite=True + self.file_io.copy_file(source_file, target_file, overwrite=True) + with self.file_io.new_input_stream(target_file) as in_stream: + content = in_stream.read() + self.assertEqual(content, b"source content") + + # Test 3: Creates parent directory if it doesn't exist + target_file_in_subdir = self._get_test_path("subdir/target.txt") + self.file_io.copy_file(source_file, target_file_in_subdir, overwrite=False) + self.assertTrue(self.file_io.exists(target_file_in_subdir)) + with self.file_io.new_input_stream(target_file_in_subdir) as in_stream: + content = in_stream.read() + self.assertEqual(content, b"source content") + + def test_try_to_write_atomic(self): + """Test try_to_write_atomic method.""" + target_dir = self._get_test_path("target_dir/") + normal_file = self._get_test_path("normal_file.txt") + + # Create target directory + self.file_io.mkdirs(target_dir) + self.assertFalse( + self.file_io.try_to_write_atomic(target_dir, "test content"), + "PyArrowFileIO should return False when target is a directory") + + # Verify no file was created inside the directory + # List directory contents to verify it's empty + selector = pafs.FileSelector(self.file_io.to_filesystem_path(target_dir), recursive=False, allow_not_found=True) + dir_contents = self.file_io.filesystem.get_file_info(selector) + self.assertEqual(len(dir_contents), 0, "No file should be created inside the directory") + + self.assertTrue(self.file_io.try_to_write_atomic(normal_file, "test content")) + content = self.file_io.read_file_utf8(normal_file) + self.assertEqual(content, "test content") + + # Delete and test again + self.file_io.delete(normal_file) + self.assertFalse( + self.file_io.try_to_write_atomic(target_dir, "test content"), + "PyArrowFileIO should return False when target is a directory") + + # Verify no file was created inside the directory + dir_contents = self.file_io.filesystem.get_file_info(selector) + self.assertEqual(len(dir_contents), 0, "No file should be created inside the directory") + + self.assertTrue(self.file_io.try_to_write_atomic(normal_file, "test content")) + content = self.file_io.read_file_utf8(normal_file) + self.assertEqual(content, "test content") + +if __name__ == '__main__': + unittest.main()