Skip to content

feat: the source and sink APIs of Flink-lance have been refactored(FLIP-143/FLIP-27)#15

Open
fightBoxing wants to merge 9 commits intomainfrom
review-source-sink
Open

feat: the source and sink APIs of Flink-lance have been refactored(FLIP-143/FLIP-27)#15
fightBoxing wants to merge 9 commits intomainfrom
review-source-sink

Conversation

@fightBoxing
Copy link
Copy Markdown
Collaborator

  • Based on the trino-lance plugin, the source and sink APIs of Flink-lance have been refactored.

  • code architecture diagram

Clipboard_Screenshot_1770775901
  • Implementation details

    • Migrate Source from RichParallelSourceFunction to Source V2 API (FLIP-27)

      • Add LanceSource, LanceSourceReader, LanceSplitEnumerator
      • Add LanceSourceSplit, LanceEnumeratorState with serializers
      • Support checkpoint/recovery, parallel split assignment
      • Update LanceDynamicTableSource to use SourceProvider
    • Migrate Sink from RichSinkFunction to Sink V2 API (FLIP-143)

      • Add LanceSink, LanceSinkWriter
      • Support APPEND and OVERWRITE write modes
      • Support checkpoint flush and auto batch flush
      • Update LanceDynamicTableSink to use SinkV2Provider
      • Fix Append mode missing read_version parameter bug
    • Add comprehensive unit tests

      • LanceSourceV2Test: 23 tests covering Split, Serializer, State, Source
      • LanceSinkV2Test: 13 tests covering Writer, write modes, checkpoint

rockyyin added 6 commits February 11, 2026 10:01
- Migrate Source from RichParallelSourceFunction to Source V2 API (FLIP-27)
  - Add LanceSource, LanceSourceReader, LanceSplitEnumerator
  - Add LanceSourceSplit, LanceEnumeratorState with serializers
  - Support checkpoint/recovery, parallel split assignment
  - Update LanceDynamicTableSource to use SourceProvider

- Migrate Sink from RichSinkFunction to Sink V2 API (FLIP-143)
  - Add LanceSink, LanceSinkWriter
  - Support APPEND and OVERWRITE write modes
  - Support checkpoint flush and auto batch flush
  - Update LanceDynamicTableSink to use SinkV2Provider
  - Fix Append mode missing read_version parameter bug

- Add comprehensive unit tests
  - LanceSourceV2Test: 23 tests covering Split, Serializer, State, Source
  - LanceSinkV2Test: 13 tests covering Writer, write modes, checkpoint

- All comments and messages in English
- Add Maven Wrapper (mvnw) with Maven 3.9.9
- Add GitHub Actions workflows (CI, release, publish, auto-bump, PR title check)
- Add version management with bumpversion and CI scripts
- Add Makefile for common build commands
- Add checkstyle configuration
- Update pom.xml with release/deploy profiles and version management
- Update .gitignore for build artifacts
- Remove trailing whitespace from all Java source files
- Fix LineLength violations (>120 chars) by wrapping long lines:
  - Long throw/exception messages split across lines
  - Long Javadoc comments wrapped at sentence boundaries
  - Long method signatures with multiple parameters/throws
  - Long API call chains broken at method calls
- Remove 21 unused imports across multiple files
- Add braces to single-line if/else (NeedBraces)
- Remove redundant modifier in LanceInputFormat (RedundantModifier)
- Rename method 'm()' to 'maxEdges()' in LanceIndexBuilder (MethodName)
- Exclude test sources from checkstyle (includeTestSourceDirectory=false)
- Change checkstyle LineLength max from 100 to 120
- Apply google-java-format via spotless:apply
- Standardize license header format
- Fix indentation (4 spaces -> 2 spaces per google-java-format)
- Fix import ordering and grouping
- Fix Javadoc formatting
- 42 files reformatted
- Add buildInFilter() method to convert Flink IN expressions to Lance SQL
- Generates 'field IN (val1, val2, ...)' filter syntax
- Fixes testInPredicatePushDown test failure
…e LanceSourceHandle

P1-A: Split LanceOptions God Object
- Extract LanceSourceOptions (read-side: path, batchSize, limit, columns, filter)
- Extract LanceSinkOptions (write-side: path, batchSize, writeMode, maxRowsPerFile)
- Extract LanceIndexOptions (index-building: indexType, metricType, PQ/HNSW params)
- Extract LanceVectorSearchOptions (vector-search: nprobes, ef, refineFactor)
- Add toSourceOptions/toSinkOptions/toIndexOptions/toVectorSearchOptions to LanceOptions
- All new classes are immutable with Builder pattern and validation

P1-B: Introduce LanceSourceHandle immutable object
- Carry push-down state (projection, filters, limit, aggregateInfo) as immutable snapshot
- Defensive copies for arrays and unmodifiable lists
- Add getHandle() and buildSourceOptions() to LanceDynamicTableSource

Tests: 43 new tests covering all config classes, conversion, and handle immutability
@fightBoxing
Copy link
Copy Markdown
Collaborator Author

Supplementary Explanation

the commitId is : 9bf9dd1

  • Split LanceOptions and introduce LanceSourceHandle.

  • The Split LanceOptions God Object splits the 849-line LanceOptions into four immutable configuration classes based on their responsibilities:

    • LanceSourceOptions — Read configuration

    • LanceSinkOptions — Write configuration

    • LanceIndexOptions — Index building configuration

    • LanceVectorSearchOptions — Vector search configuration

    Added transformation methods to LanceOptions to maintain full backward compatibility.

  • Introduce LanceSourceHandle The immutable Handle object encapsulates all push-down states (projection, filters, limit, aggregateInfo) and uses defensive copying to ensure thread safety.

…ogic

P2-A: Unified LanceDatasetFactory
- Extract LanceDatasetFactory utility in config package
- Provides open(path, allocator) with path validation and error wrapping
- Provides openManaged(path) with auto-managed allocator lifecycle
- Provides createAllocator(), validatePath(), closeQuietly() helpers
- ManagedDataset implements Closeable for try-with-resources pattern

P2-B: Replace all 12 Dataset.open calls across 10 files
- LanceSource, LanceAggregateSource, LanceInputFormat (legacy sources)
- LanceSink (legacy sink)
- LanceSinkWriter, LanceSourceReader, LanceSplitEnumerator (V2 runtime)
- LanceIndexBuilder, LanceVectorSearch (utility classes)
- LanceCatalog (3 occurrences)
- Net reduction: 31 lines of duplicated boilerplate removed

Tests: 12 new tests for LanceDatasetFactory covering:
- Path validation (null, empty, valid)
- Allocator creation and cleanup
- Error handling for non-existent datasets
- closeQuietly null-safety

All 269 tests pass. Spotless + Checkstyle clean.
@fightBoxing
Copy link
Copy Markdown
Collaborator Author

Supplementary Explanation

the commitId is: 4a3ff7e

  • Unified DatasetFactory — Eliminating duplicate Dataset opening logic
  • Refactor the code directory structur

P3: Catalog Refactoring — Extract storage logic from LanceCatalog

New catalog subpackage: org.apache.flink.connector.lance.catalog

1. LanceCatalogPathResolver
   - Immutable path resolver for warehouse/database/table paths
   - Handles path normalization (trailing slash removal)
   - Detects remote storage (S3, GCS, Azure, HTTP/HTTPS)
   - resolveDatabasePath(), resolveTablePath()

2. LanceStorageProvider (interface)
   - Storage abstraction layer with 10 operations
   - initializeWarehouse, listDatabases, databaseExists, createDatabase
   - dropDatabase, listTables, tableExists, dropTable, renameTable
   - registerTable, configureEnvironment

3. LocalStorageProvider (implements LanceStorageProvider)
   - Local filesystem operations using java.nio.file
   - Lance dataset detection via _versions directory
   - Recursive directory deletion

4. RemoteStorageProvider (implements LanceStorageProvider)
   - In-memory ConcurrentHashMap registries for databases/tables
   - Lazy dataset existence probing via LanceDatasetFactory
   - Delegates to StorageEnvironmentManager for S3 credentials

5. StorageEnvironmentManager
   - Centralized S3/cloud credential management
   - Maps internal storage keys to AWS system properties
   - Maps internal keys to table-level connector options (toTableOptions)

LanceCatalog changes:
- Replaced all if(isRemoteStorage) branches with StorageProvider delegation
- Removed 7 private methods: configureStorageEnvironment, getDatabasePath,
  getDatasetPath, getStorageOptionsForTable, normalizeWarehousePath,
  isRemotePath, deleteDirectory
- Uses LanceDatasetFactory.createAllocator() / closeQuietly()
- Added getPathResolver() and getStorageProvider() for testability
- Net reduction: ~200 lines of mixed local/remote branching logic

Tests: 57 new tests across 4 test classes:
- LanceCatalogPathResolverTest: 18 tests (normalization, remote detection, resolution)
- StorageEnvironmentManagerTest: 7 tests (configure, toTableOptions)
- LocalStorageProviderTest: 16 tests (init, db ops, table ops, no-ops)
- RemoteStorageProviderTest: 16 tests (registry, env config, edge cases)

All 326 tests pass. Spotless + Checkstyle clean.
@fightBoxing
Copy link
Copy Markdown
Collaborator Author

Supplementary Explanation

the commitId :0ca69002188ca98a54c0d87f0e349a77fc5383a7

  • Catalog Split and Reconstruct
  • Code Refactoring Architecture Diagram
Clipboard_Screenshot_1770789980

- LanceCatalog.createTable() now creates an empty Lance Dataset with
  the schema from CREATE TABLE DDL, persisting column info on disk
- LanceCatalog.getTable() merges user-provided table options (e.g.
  write.batch-size, write.mode) back into the returned CatalogTable
- LanceDynamicTableFactory: path is now optional (auto-injected by
  Catalog); declare S3 config options (s3-access-key, s3-secret-key,
  s3-region, s3-endpoint) as optional options
- LanceSinkWriter: support remote storage (S3/GCS/Azure) path existence
  checking via Dataset.open() probe instead of local Files.exists()
- Add 7 new integration tests covering CREATE TABLE lifecycle:
  dataset creation, user options preservation, duplicate detection,
  DROP TABLE, vector columns, custom databases, S3 options declaration

All 326 tests pass. Spotless + Checkstyle clean.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant