Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a5a9167
[design] Add FUSE local path configuration design for RESTCatalog
shyjsarah Mar 13, 2026
69d4bcd
[design] Simplify FUSE validation: remove NIO FileStore option, use S…
shyjsarah Mar 13, 2026
f86b2be
[design] Use LocalFileIO instead of Java NIO Files API for FUSE valid…
shyjsarah Mar 13, 2026
89323f9
[design] Replace OSS with 'remote storage' terminology
shyjsarah Mar 13, 2026
fa8af10
[design] Clarify that getTableToken is still needed for validation
shyjsarah Mar 13, 2026
8ab6366
[design] Simplify background description, remove getTableToken refere…
shyjsarah Mar 13, 2026
846a2fd
[design] Remove obsolete validation schemes
shyjsarah Mar 13, 2026
110d396
[design] Fix comment about tables without schema
shyjsarah Mar 13, 2026
6ffefc5
[design] Update validation table for tables without schema
shyjsarah Mar 13, 2026
82c0aef
[design] Add .paimon-identifier as first validation step
shyjsarah Mar 13, 2026
ba44dc0
[design] Simplify .paimon-identifier to only contain UUID
shyjsarah Mar 13, 2026
4eece09
[design] Simplify createLocalFileIO to use existing context
shyjsarah Mar 13, 2026
6e44797
[design] Rename .paimon-identifier to .identifier
shyjsarah Mar 13, 2026
45a13e9
[design] Remove created-at from .identifier file format
shyjsarah Mar 13, 2026
bab21e5
[design] Change .identifier format to JSON, use JsonSerdeUtil for par…
shyjsarah Mar 13, 2026
2397e29
[design] Change field name from table-uuid to uuid for consistency wi…
shyjsarah Mar 13, 2026
b792bff
Add FUSE error handling design
shyjsarah Mar 13, 2026
9995d2a
Simplify FUSE error handling to focus on FUSE-specific errors only
shyjsarah Mar 13, 2026
abec068
Add configurable retry with exponential backoff for FUSE error handling
shyjsarah Mar 13, 2026
211abaf
update design
shyjsarah Mar 19, 2026
bbf6f5c
Implement FUSE local path support for REST Catalog
shyjsarah Mar 19, 2026
54fea93
Add documentation for FUSE local path feature
shyjsarah Mar 19, 2026
8621ea2
Remove design documents
shyjsarah Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions docs/content/pypaimon/fuse-support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
---
title: "FUSE Support"
weight: 7
type: docs
aliases:
- /pypaimon/fuse-support.html
---

# FUSE Support

When using PyPaimon REST Catalog to access remote object storage (such as OSS, S3, or HDFS), data access typically goes through remote storage SDKs. However, in scenarios where remote storage paths are mounted locally via FUSE (Filesystem in Userspace), users can access data directly through local filesystem paths for better performance.

This feature enables PyPaimon to use local file access when FUSE mount is available, bypassing remote storage SDKs.

## Configuration

| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `fuse.local-path.enabled` | Boolean | `false` | Whether to enable FUSE local path mapping |
| `fuse.local-path.root` | String | (none) | FUSE mounted local root path, e.g., `/mnt/fuse/warehouse` |
| `fuse.local-path.validation-mode` | String | `strict` | Validation mode: `strict`, `warn`, or `none` |

## Usage

```python
from pypaimon import CatalogFactory

catalog_options = {
'metastore': 'rest',
'uri': 'http://rest-server:8080',
'warehouse': 'oss://my-catalog/',
'token.provider': 'xxx',

# FUSE local path configuration
'fuse.local-path.enabled': 'true',
'fuse.local-path.root': '/mnt/fuse/warehouse',
'fuse.local-path.validation-mode': 'strict'
}

catalog = CatalogFactory.create(catalog_options)
```

## Validation Modes

Validation is performed on first data access to verify FUSE mount correctness. The `validation-mode` controls behavior when the local path does not exist:

| Mode | Behavior | Use Case |
|------|----------|----------|
| `strict` | Throw exception, block operation | Production, safety first |
| `warn` | Log warning, fallback to default FileIO | Testing, compatibility first |
| `none` | Skip validation, use directly | Trusted environment, performance first |

**Note**: Configuration errors (e.g., `fuse.local-path.enabled=true` but `fuse.local-path.root` not configured) will throw exceptions directly, regardless of validation mode.

## How It Works

1. When `fuse.local-path.enabled=true`, PyPaimon attempts to use local file access
2. On first data access, validation is triggered (unless mode is `none`)
3. Validation fetches the `default` database location and converts it to local path
4. If local path exists, subsequent data access uses `LocalFileIO`
5. If validation fails, behavior depends on `validation-mode`

## Example Scenario

Assume you have:
- Remote storage: `oss://my-catalog/`
- FUSE mount: `/mnt/fuse/warehouse` (mounted to `oss://my-catalog/`)

```python
from pypaimon import CatalogFactory

# Create catalog with FUSE enabled
catalog = CatalogFactory.create({
'metastore': 'rest',
'uri': 'http://rest-server:8080',
'warehouse': 'oss://my-catalog/',
'fuse.local-path.enabled': 'true',
'fuse.local-path.root': '/mnt/fuse/warehouse'
})

# When reading table data, PyPaimon will:
# 1. Convert "oss://my-catalog/db/table" to "/mnt/fuse/warehouse/db/table"
# 2. Use LocalFileIO to read from local path
# 3. Bypass remote OSS SDK for better performance
table = catalog.get_table('db.table')
reader = table.new_read_builder().new_read()
```

## Limitations

- Only catalog-level FUSE mount is supported (single `fuse.local-path.root` configuration)
- Validation only checks if local path exists, not data consistency
- If FUSE mount becomes unavailable after validation, file operations may fail
114 changes: 113 additions & 1 deletion paimon-python/pypaimon/catalog/rest/rest_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging
from typing import Any, Callable, Dict, List, Optional, Union
from urllib.parse import urlparse

from pypaimon.api.api_response import GetTableResponse, PagedList
from pypaimon.api.rest_api import RESTApi
Expand All @@ -32,10 +34,11 @@
from pypaimon.catalog.rest.property_change import PropertyChange
from pypaimon.catalog.rest.rest_token_file_io import RESTTokenFileIO
from pypaimon.catalog.rest.table_metadata import TableMetadata
from pypaimon.common.options.config import CatalogOptions
from pypaimon.common.options.config import CatalogOptions, FuseOptions
from pypaimon.common.options.core_options import CoreOptions
from pypaimon.common.file_io import FileIO
from pypaimon.common.identifier import Identifier
from pypaimon.filesystem.local_file_io import LocalFileIO
from pypaimon.schema.schema import Schema
from pypaimon.schema.schema_change import SchemaChange
from pypaimon.schema.table_schema import TableSchema
Expand All @@ -45,6 +48,8 @@
from pypaimon.table.format.format_table import FormatTable, Format
from pypaimon.table.iceberg.iceberg_table import IcebergTable

logger = logging.getLogger(__name__)

FORMAT_TABLE_TYPE = "format-table"
ICEBERG_TABLE_TYPE = "iceberg-table"

Expand All @@ -57,6 +62,15 @@ def __init__(self, context: CatalogContext, config_required: Optional[bool] = Tr
context.prefer_io_loader, context.fallback_io_loader)
self.data_token_enabled = self.rest_api.options.get(CatalogOptions.DATA_TOKEN_ENABLED)

# FUSE local path configuration
self.fuse_local_path_enabled = self.context.options.get(
FuseOptions.FUSE_LOCAL_PATH_ENABLED, False)
self.fuse_local_path_root = self.context.options.get(
FuseOptions.FUSE_LOCAL_PATH_ROOT)
self.fuse_validation_mode = self.context.options.get(
FuseOptions.FUSE_LOCAL_PATH_VALIDATION_MODE, "strict")
self._fuse_validation_state = None # None=not validated, True=passed, False=failed

def catalog_loader(self):
"""
Create and return a RESTCatalogLoader for this catalog.
Expand Down Expand Up @@ -338,9 +352,107 @@ def file_io_from_options(self, table_path: str) -> FileIO:
return FileIO.get(table_path, self.context.options)

def file_io_for_data(self, table_path: str, identifier: Identifier):
"""
Get FileIO for data access, supporting FUSE local path mapping.
"""
# Try to use FUSE local path
if self.fuse_local_path_enabled:
# Configuration error raises exception directly
local_path = self._resolve_fuse_local_path(table_path)

# Perform validation (only once)
if self._fuse_validation_state is None:
self._validate_fuse_path()

# Validation passed, return local FileIO
if self._fuse_validation_state:
return LocalFileIO(local_path, self.context.options)

# warn mode validation failed, fallback to default FileIO
return RESTTokenFileIO(identifier, table_path, self.context.options) \
if self.data_token_enabled else self.file_io_from_options(table_path)

# Fallback to original logic
return RESTTokenFileIO(identifier, table_path, self.context.options) \
if self.data_token_enabled else self.file_io_from_options(table_path)

def _resolve_fuse_local_path(self, original_path: str) -> str:
"""
Resolve FUSE local path.

FUSE mount point is mapped to catalog level, so skip the catalog name in the path.

Returns:
Local path

Raises:
ValueError: If fuse.local-path.root is not configured
"""
if not self.fuse_local_path_root:
raise ValueError(
"FUSE local path is enabled but fuse.local-path.root is not configured"
)

uri = urlparse(original_path)

# For URIs with scheme (e.g., oss://bucket/db/table):
# - netloc is the bucket name (which corresponds to catalog name)
# - path is the rest (e.g., /db/table)
# We skip the catalog/bucket level and keep only db/table path.
if uri.scheme:
# Skip netloc (bucket/catalog), only use path part
path_part = uri.path.lstrip('/')
else:
# No scheme: path format is "catalog/db/table", skip first segment
path_part = original_path.lstrip('/')
segments = path_part.split('/')
if len(segments) > 1:
path_part = '/'.join(segments[1:])

return f"{self.fuse_local_path_root.rstrip('/')}/{path_part}"

def _validate_fuse_path(self) -> None:
"""
Validate FUSE local path is correctly mounted.

Get default database's location, convert to local path and check if it exists.
"""
if self.fuse_validation_mode == "none":
self._fuse_validation_state = True
return

# Get default database details, API call failure raises exception directly
db = self.rest_api.get_database("default")
remote_location = db.location

if not remote_location:
logger.info("Default database has no location, skipping FUSE validation")
self._fuse_validation_state = True
return

expected_local = self._resolve_fuse_local_path(remote_location)
local_file_io = LocalFileIO(expected_local, self.context.options)

# Only validate if local path exists, handle based on validation mode
if not local_file_io.exists(expected_local):
error_msg = (
f"FUSE local path validation failed: "
f"local path '{expected_local}' does not exist "
f"for default database location '{remote_location}'"
)
self._handle_validation_error(error_msg)
else:
self._fuse_validation_state = True
logger.info("FUSE local path validation passed")

def _handle_validation_error(self, error_msg: str) -> None:
"""Handle validation error based on validation mode."""
if self.fuse_validation_mode == "strict":
raise ValueError(error_msg)
elif self.fuse_validation_mode == "warn":
logger.warning(f"{error_msg}. Falling back to default FileIO.")
self._fuse_validation_state = False # Mark validation failed, fallback to default FileIO

def load_table(self,
identifier: Identifier,
internal_file_io: Callable[[str], Any],
Expand Down
25 changes: 25 additions & 0 deletions paimon-python/pypaimon/common/options/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,28 @@ class CatalogOptions:
HTTP_USER_AGENT_HEADER = ConfigOptions.key(
"header.HTTP_USER_AGENT").string_type().no_default_value().with_description("HTTP User Agent header")
BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2 ** 31 - 1


class FuseOptions:
"""FUSE local path configuration options."""

FUSE_LOCAL_PATH_ENABLED = (
ConfigOptions.key("fuse.local-path.enabled")
.boolean_type()
.default_value(False)
.with_description("Whether to enable FUSE local path mapping")
)

FUSE_LOCAL_PATH_ROOT = (
ConfigOptions.key("fuse.local-path.root")
.string_type()
.no_default_value()
.with_description("FUSE mounted local root path, e.g., /mnt/fuse/warehouse")
)

FUSE_LOCAL_PATH_VALIDATION_MODE = (
ConfigOptions.key("fuse.local-path.validation-mode")
.string_type()
.default_value("strict")
.with_description("Validation mode: strict, warn, or none")
)
Loading
Loading