diff --git a/docs/content/pypaimon/fuse-support.md b/docs/content/pypaimon/fuse-support.md new file mode 100644 index 000000000000..41ea330e68bf --- /dev/null +++ b/docs/content/pypaimon/fuse-support.md @@ -0,0 +1,93 @@ +--- +title: "FUSE Support" +weight: 7 +type: docs +aliases: + - /pypaimon/fuse-support.html +--- + +# FUSE Support + +When using PyPaimon REST Catalog to access remote object storage (such as OSS, S3, or HDFS), data access typically goes through remote storage SDKs. However, in scenarios where remote storage paths are mounted locally via FUSE (Filesystem in Userspace), users can access data directly through local filesystem paths for better performance. + +This feature enables PyPaimon to use local file access when FUSE mount is available, bypassing remote storage SDKs. + +## Configuration + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `fuse.local-path.enabled` | Boolean | `false` | Whether to enable FUSE local path mapping | +| `fuse.local-path.root` | String | (none) | FUSE mounted local root path, e.g., `/mnt/fuse/warehouse` | +| `fuse.local-path.validation-mode` | String | `strict` | Validation mode: `strict`, `warn`, or `none` | + +## Usage + +```python +from pypaimon import CatalogFactory + +catalog_options = { + 'metastore': 'rest', + 'uri': 'http://rest-server:8080', + 'warehouse': 'oss://my-catalog/', + 'token.provider': 'xxx', + + # FUSE local path configuration + 'fuse.local-path.enabled': 'true', + 'fuse.local-path.root': '/mnt/fuse/warehouse', + 'fuse.local-path.validation-mode': 'strict' +} + +catalog = CatalogFactory.create(catalog_options) +``` + +## Validation Modes + +Validation is performed on first data access to verify FUSE mount correctness. The `validation-mode` controls behavior when the local path does not exist: + +| Mode | Behavior | Use Case | +|------|----------|----------| +| `strict` | Throw exception, block operation | Production, safety first | +| `warn` | Log warning, fallback to default FileIO | Testing, compatibility first | +| `none` | Skip validation, use directly | Trusted environment, performance first | + +**Note**: Configuration errors (e.g., `fuse.local-path.enabled=true` but `fuse.local-path.root` not configured) will throw exceptions directly, regardless of validation mode. + +## How It Works + +1. When `fuse.local-path.enabled=true`, PyPaimon attempts to use local file access +2. On first data access, validation is triggered (unless mode is `none`) +3. Validation fetches the `default` database location and converts it to local path +4. If local path exists, subsequent data access uses `LocalFileIO` +5. If validation fails, behavior depends on `validation-mode` + +## Example Scenario + +Assume you have: +- Remote storage: `oss://my-catalog/` +- FUSE mount: `/mnt/fuse/warehouse` (mounted to `oss://my-catalog/`) + +```python +from pypaimon import CatalogFactory + +# Create catalog with FUSE enabled +catalog = CatalogFactory.create({ + 'metastore': 'rest', + 'uri': 'http://rest-server:8080', + 'warehouse': 'oss://my-catalog/', + 'fuse.local-path.enabled': 'true', + 'fuse.local-path.root': '/mnt/fuse/warehouse' +}) + +# When reading table data, PyPaimon will: +# 1. Convert "oss://my-catalog/db/table" to "/mnt/fuse/warehouse/db/table" +# 2. Use LocalFileIO to read from local path +# 3. Bypass remote OSS SDK for better performance +table = catalog.get_table('db.table') +reader = table.new_read_builder().new_read() +``` + +## Limitations + +- Only catalog-level FUSE mount is supported (single `fuse.local-path.root` configuration) +- Validation only checks if local path exists, not data consistency +- If FUSE mount becomes unavailable after validation, file operations may fail diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py b/paimon-python/pypaimon/catalog/rest/rest_catalog.py index 943d88840e2a..58f3e21f3d23 100644 --- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py +++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py @@ -15,7 +15,9 @@ See the License for the specific language governing permissions and limitations under the License. """ +import logging from typing import Any, Callable, Dict, List, Optional, Union +from urllib.parse import urlparse from pypaimon.api.api_response import GetTableResponse, PagedList from pypaimon.api.rest_api import RESTApi @@ -32,10 +34,11 @@ from pypaimon.catalog.rest.property_change import PropertyChange from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO from pypaimon.catalog.rest.table_metadata import TableMetadata -from pypaimon.common.options.config import CatalogOptions +from pypaimon.common.options.config import CatalogOptions, FuseOptions from pypaimon.common.options.core_options import CoreOptions from pypaimon.common.file_io import FileIO from pypaimon.common.identifier import Identifier +from pypaimon.filesystem.local_file_io import LocalFileIO from pypaimon.schema.schema import Schema from pypaimon.schema.schema_change import SchemaChange from pypaimon.schema.table_schema import TableSchema @@ -45,6 +48,8 @@ from pypaimon.table.format.format_table import FormatTable, Format from pypaimon.table.iceberg.iceberg_table import IcebergTable +logger = logging.getLogger(__name__) + FORMAT_TABLE_TYPE = "format-table" ICEBERG_TABLE_TYPE = "iceberg-table" @@ -57,6 +62,15 @@ def __init__(self, context: CatalogContext, config_required: Optional[bool] = Tr context.prefer_io_loader, context.fallback_io_loader) self.data_token_enabled = self.rest_api.options.get(CatalogOptions.DATA_TOKEN_ENABLED) + # FUSE local path configuration + self.fuse_local_path_enabled = self.context.options.get( + FuseOptions.FUSE_LOCAL_PATH_ENABLED, False) + self.fuse_local_path_root = self.context.options.get( + FuseOptions.FUSE_LOCAL_PATH_ROOT) + self.fuse_validation_mode = self.context.options.get( + FuseOptions.FUSE_LOCAL_PATH_VALIDATION_MODE, "strict") + self._fuse_validation_state = None # None=not validated, True=passed, False=failed + def catalog_loader(self): """ Create and return a RESTCatalogLoader for this catalog. @@ -338,9 +352,107 @@ def file_io_from_options(self, table_path: str) -> FileIO: return FileIO.get(table_path, self.context.options) def file_io_for_data(self, table_path: str, identifier: Identifier): + """ + Get FileIO for data access, supporting FUSE local path mapping. + """ + # Try to use FUSE local path + if self.fuse_local_path_enabled: + # Configuration error raises exception directly + local_path = self._resolve_fuse_local_path(table_path) + + # Perform validation (only once) + if self._fuse_validation_state is None: + self._validate_fuse_path() + + # Validation passed, return local FileIO + if self._fuse_validation_state: + return LocalFileIO(local_path, self.context.options) + + # warn mode validation failed, fallback to default FileIO + return RESTTokenFileIO(identifier, table_path, self.context.options) \ + if self.data_token_enabled else self.file_io_from_options(table_path) + + # Fallback to original logic return RESTTokenFileIO(identifier, table_path, self.context.options) \ if self.data_token_enabled else self.file_io_from_options(table_path) + def _resolve_fuse_local_path(self, original_path: str) -> str: + """ + Resolve FUSE local path. + + FUSE mount point is mapped to catalog level, so skip the catalog name in the path. + + Returns: + Local path + + Raises: + ValueError: If fuse.local-path.root is not configured + """ + if not self.fuse_local_path_root: + raise ValueError( + "FUSE local path is enabled but fuse.local-path.root is not configured" + ) + + uri = urlparse(original_path) + + # For URIs with scheme (e.g., oss://bucket/db/table): + # - netloc is the bucket name (which corresponds to catalog name) + # - path is the rest (e.g., /db/table) + # We skip the catalog/bucket level and keep only db/table path. + if uri.scheme: + # Skip netloc (bucket/catalog), only use path part + path_part = uri.path.lstrip('/') + else: + # No scheme: path format is "catalog/db/table", skip first segment + path_part = original_path.lstrip('/') + segments = path_part.split('/') + if len(segments) > 1: + path_part = '/'.join(segments[1:]) + + return f"{self.fuse_local_path_root.rstrip('/')}/{path_part}" + + def _validate_fuse_path(self) -> None: + """ + Validate FUSE local path is correctly mounted. + + Get default database's location, convert to local path and check if it exists. + """ + if self.fuse_validation_mode == "none": + self._fuse_validation_state = True + return + + # Get default database details, API call failure raises exception directly + db = self.rest_api.get_database("default") + remote_location = db.location + + if not remote_location: + logger.info("Default database has no location, skipping FUSE validation") + self._fuse_validation_state = True + return + + expected_local = self._resolve_fuse_local_path(remote_location) + local_file_io = LocalFileIO(expected_local, self.context.options) + + # Only validate if local path exists, handle based on validation mode + if not local_file_io.exists(expected_local): + error_msg = ( + f"FUSE local path validation failed: " + f"local path '{expected_local}' does not exist " + f"for default database location '{remote_location}'" + ) + self._handle_validation_error(error_msg) + else: + self._fuse_validation_state = True + logger.info("FUSE local path validation passed") + + def _handle_validation_error(self, error_msg: str) -> None: + """Handle validation error based on validation mode.""" + if self.fuse_validation_mode == "strict": + raise ValueError(error_msg) + elif self.fuse_validation_mode == "warn": + logger.warning(f"{error_msg}. Falling back to default FileIO.") + self._fuse_validation_state = False # Mark validation failed, fallback to default FileIO + def load_table(self, identifier: Identifier, internal_file_io: Callable[[str], Any], diff --git a/paimon-python/pypaimon/common/options/config.py b/paimon-python/pypaimon/common/options/config.py index fb6a46446ea0..6f5d290495c0 100644 --- a/paimon-python/pypaimon/common/options/config.py +++ b/paimon-python/pypaimon/common/options/config.py @@ -83,3 +83,28 @@ class CatalogOptions: HTTP_USER_AGENT_HEADER = ConfigOptions.key( "header.HTTP_USER_AGENT").string_type().no_default_value().with_description("HTTP User Agent header") BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1 + + +class FuseOptions: + """FUSE local path configuration options.""" + + FUSE_LOCAL_PATH_ENABLED = ( + ConfigOptions.key("fuse.local-path.enabled") + .boolean_type() + .default_value(False) + .with_description("Whether to enable FUSE local path mapping") + ) + + FUSE_LOCAL_PATH_ROOT = ( + ConfigOptions.key("fuse.local-path.root") + .string_type() + .no_default_value() + .with_description("FUSE mounted local root path, e.g., /mnt/fuse/warehouse") + ) + + FUSE_LOCAL_PATH_VALIDATION_MODE = ( + ConfigOptions.key("fuse.local-path.validation-mode") + .string_type() + .default_value("strict") + .with_description("Validation mode: strict, warn, or none") + ) diff --git a/paimon-python/pypaimon/tests/rest/test_fuse_local_path.py b/paimon-python/pypaimon/tests/rest/test_fuse_local_path.py new file mode 100644 index 000000000000..a1e8e4cc63a5 --- /dev/null +++ b/paimon-python/pypaimon/tests/rest/test_fuse_local_path.py @@ -0,0 +1,230 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import unittest +from unittest.mock import MagicMock, patch + +from pypaimon.catalog.rest.rest_catalog import RESTCatalog +from pypaimon.common.options import Options +from pypaimon.common.options.config import FuseOptions + + +class TestFuseLocalPath(unittest.TestCase): + """Test cases for FUSE local path functionality.""" + + def _create_catalog_with_fuse( + self, + enabled: bool = True, + root: str = "/mnt/fuse/warehouse", + validation_mode: str = "strict" + ) -> RESTCatalog: + """Helper to create a mock RESTCatalog with FUSE configuration.""" + options = Options({ + "uri": "http://localhost:8080", + "warehouse": "oss://catalog/warehouse", + FuseOptions.FUSE_LOCAL_PATH_ENABLED.key(): str(enabled).lower(), + FuseOptions.FUSE_LOCAL_PATH_ROOT.key(): root, + FuseOptions.FUSE_LOCAL_PATH_VALIDATION_MODE.key(): validation_mode, + }) + + # Create a mock catalog directly without going through __init__ + catalog = MagicMock(spec=RESTCatalog) + catalog.fuse_local_path_enabled = enabled + catalog.fuse_local_path_root = root + catalog.fuse_validation_mode = validation_mode + catalog._fuse_validation_state = None + catalog.data_token_enabled = False + catalog.rest_api = MagicMock() + catalog.context = MagicMock() + catalog.context.options = options + + # Bind actual methods to the mock + catalog._resolve_fuse_local_path = RESTCatalog._resolve_fuse_local_path.__get__(catalog) + catalog._validate_fuse_path = RESTCatalog._validate_fuse_path.__get__(catalog) + catalog._handle_validation_error = RESTCatalog._handle_validation_error.__get__(catalog) + catalog.file_io_for_data = RESTCatalog.file_io_for_data.__get__(catalog) + catalog.file_io_from_options = MagicMock(return_value=MagicMock()) + + return catalog + + # ========== _resolve_fuse_local_path Tests ========== + + def test_resolve_fuse_local_path_basic(self): + """Test basic path conversion.""" + catalog = self._create_catalog_with_fuse() + + result = catalog._resolve_fuse_local_path("oss://catalog/db1/table1") + self.assertEqual(result, "/mnt/fuse/warehouse/db1/table1") + + def test_resolve_fuse_local_path_with_trailing_slash(self): + """Test fuse_root with trailing slash.""" + catalog = self._create_catalog_with_fuse(root="/mnt/fuse/warehouse/") + + result = catalog._resolve_fuse_local_path("oss://catalog/db1/table1") + self.assertEqual(result, "/mnt/fuse/warehouse/db1/table1") + + def test_resolve_fuse_local_path_deep_path(self): + """Test deep path with multiple levels.""" + catalog = self._create_catalog_with_fuse() + + result = catalog._resolve_fuse_local_path( + "oss://catalog/db1/table1/partition1/file.parquet" + ) + self.assertEqual( + result, + "/mnt/fuse/warehouse/db1/table1/partition1/file.parquet" + ) + + def test_resolve_fuse_local_path_without_scheme(self): + """Test path without scheme.""" + catalog = self._create_catalog_with_fuse() + + result = catalog._resolve_fuse_local_path("catalog/db1/table1") + self.assertEqual(result, "/mnt/fuse/warehouse/db1/table1") + + def test_resolve_fuse_local_path_missing_root(self): + """Test error when root is not configured.""" + catalog = self._create_catalog_with_fuse(root=None) + + with self.assertRaises(ValueError) as context: + catalog._resolve_fuse_local_path("oss://catalog/db1/table1") + + self.assertIn("fuse.local-path.root is not configured", str(context.exception)) + + # ========== Validation Tests ========== + + def test_validation_mode_none_skips_validation(self): + """Test none mode skips validation.""" + catalog = self._create_catalog_with_fuse(validation_mode="none") + + catalog._validate_fuse_path() + + self.assertTrue(catalog._fuse_validation_state) + + def test_validation_mode_strict_raises_on_failure(self): + """Test strict mode raises exception on validation failure.""" + catalog = self._create_catalog_with_fuse(validation_mode="strict") + + # Mock default database with location + mock_db = MagicMock() + mock_db.location = "oss://catalog/default" + catalog.rest_api.get_database.return_value = mock_db + + # Mock LocalFileIO to return False for exists + with patch('pypaimon.catalog.rest.rest_catalog.LocalFileIO') as mock_local_io: + mock_instance = MagicMock() + mock_instance.exists.return_value = False + mock_local_io.return_value = mock_instance + + with self.assertRaises(ValueError) as context: + catalog._validate_fuse_path() + + self.assertIn("FUSE local path validation failed", str(context.exception)) + + def test_validation_mode_warn_fallback_on_failure(self): + """Test warn mode falls back to default FileIO on validation failure.""" + catalog = self._create_catalog_with_fuse(validation_mode="warn") + + # Mock default database with location + mock_db = MagicMock() + mock_db.location = "oss://catalog/default" + catalog.rest_api.get_database.return_value = mock_db + + # Mock LocalFileIO to return False for exists + with patch('pypaimon.catalog.rest.rest_catalog.LocalFileIO') as mock_local_io: + mock_instance = MagicMock() + mock_instance.exists.return_value = False + mock_local_io.return_value = mock_instance + + # Should not raise, just set state to False + catalog._validate_fuse_path() + + self.assertFalse(catalog._fuse_validation_state) + + def test_validation_passes_when_local_exists(self): + """Test validation passes when local path exists.""" + catalog = self._create_catalog_with_fuse(validation_mode="strict") + + # Mock default database with location + mock_db = MagicMock() + mock_db.location = "oss://catalog/default" + catalog.rest_api.get_database.return_value = mock_db + + # Mock LocalFileIO to return True for exists + with patch('pypaimon.catalog.rest.rest_catalog.LocalFileIO') as mock_local_io: + mock_instance = MagicMock() + mock_instance.exists.return_value = True + mock_local_io.return_value = mock_instance + + catalog._validate_fuse_path() + + self.assertTrue(catalog._fuse_validation_state) + + def test_validation_skips_when_no_location(self): + """Test validation skips when default database has no location.""" + catalog = self._create_catalog_with_fuse(validation_mode="strict") + + # Mock default database without location + mock_db = MagicMock() + mock_db.location = None + catalog.rest_api.get_database.return_value = mock_db + + catalog._validate_fuse_path() + + self.assertTrue(catalog._fuse_validation_state) + + # ========== file_io_for_data Tests ========== + + def test_file_io_for_data_disabled_fuse(self): + """Test that disabled FUSE uses default FileIO.""" + catalog = self._create_catalog_with_fuse(enabled=False) + catalog.data_token_enabled = False + + from pypaimon.common.identifier import Identifier + identifier = Identifier.create("db1", "table1") + + _ = catalog.file_io_for_data("oss://catalog/db1/table1", identifier) + catalog.file_io_from_options.assert_called_once() + + def test_file_io_for_data_uses_local_when_validated(self): + """Test that validated FUSE uses LocalFileIO.""" + catalog = self._create_catalog_with_fuse(enabled=True, validation_mode="none") + catalog._fuse_validation_state = True # Already validated + + from pypaimon.common.identifier import Identifier + identifier = Identifier.create("db1", "table1") + + with patch('pypaimon.catalog.rest.rest_catalog.LocalFileIO') as mock_local_io: + mock_local_io.return_value = MagicMock() + _ = catalog.file_io_for_data("oss://catalog/db1/table1", identifier) + mock_local_io.assert_called_once() + + def test_file_io_for_data_fallback_when_validation_failed(self): + """Test that failed validation falls back to default FileIO.""" + catalog = self._create_catalog_with_fuse(enabled=True, validation_mode="warn") + catalog._fuse_validation_state = False # Validation failed + catalog.data_token_enabled = False + + from pypaimon.common.identifier import Identifier + identifier = Identifier.create("db1", "table1") + + _ = catalog.file_io_for_data("oss://catalog/db1/table1", identifier) + catalog.file_io_from_options.assert_called_once() + + +if __name__ == '__main__': + unittest.main()