feat: the source and sink APIs of Flink-lance have been refactored(FLIP-143/FLIP-27)#15
Open
fightBoxing wants to merge 9 commits intomainfrom
Open
feat: the source and sink APIs of Flink-lance have been refactored(FLIP-143/FLIP-27)#15fightBoxing wants to merge 9 commits intomainfrom
fightBoxing wants to merge 9 commits intomainfrom
Conversation
added 6 commits
February 11, 2026 10:01
- Migrate Source from RichParallelSourceFunction to Source V2 API (FLIP-27) - Add LanceSource, LanceSourceReader, LanceSplitEnumerator - Add LanceSourceSplit, LanceEnumeratorState with serializers - Support checkpoint/recovery, parallel split assignment - Update LanceDynamicTableSource to use SourceProvider - Migrate Sink from RichSinkFunction to Sink V2 API (FLIP-143) - Add LanceSink, LanceSinkWriter - Support APPEND and OVERWRITE write modes - Support checkpoint flush and auto batch flush - Update LanceDynamicTableSink to use SinkV2Provider - Fix Append mode missing read_version parameter bug - Add comprehensive unit tests - LanceSourceV2Test: 23 tests covering Split, Serializer, State, Source - LanceSinkV2Test: 13 tests covering Writer, write modes, checkpoint - All comments and messages in English
- Add Maven Wrapper (mvnw) with Maven 3.9.9 - Add GitHub Actions workflows (CI, release, publish, auto-bump, PR title check) - Add version management with bumpversion and CI scripts - Add Makefile for common build commands - Add checkstyle configuration - Update pom.xml with release/deploy profiles and version management - Update .gitignore for build artifacts
- Remove trailing whitespace from all Java source files - Fix LineLength violations (>120 chars) by wrapping long lines: - Long throw/exception messages split across lines - Long Javadoc comments wrapped at sentence boundaries - Long method signatures with multiple parameters/throws - Long API call chains broken at method calls - Remove 21 unused imports across multiple files - Add braces to single-line if/else (NeedBraces) - Remove redundant modifier in LanceInputFormat (RedundantModifier) - Rename method 'm()' to 'maxEdges()' in LanceIndexBuilder (MethodName) - Exclude test sources from checkstyle (includeTestSourceDirectory=false) - Change checkstyle LineLength max from 100 to 120
- Apply google-java-format via spotless:apply - Standardize license header format - Fix indentation (4 spaces -> 2 spaces per google-java-format) - Fix import ordering and grouping - Fix Javadoc formatting - 42 files reformatted
- Add buildInFilter() method to convert Flink IN expressions to Lance SQL - Generates 'field IN (val1, val2, ...)' filter syntax - Fixes testInPredicatePushDown test failure
…e LanceSourceHandle P1-A: Split LanceOptions God Object - Extract LanceSourceOptions (read-side: path, batchSize, limit, columns, filter) - Extract LanceSinkOptions (write-side: path, batchSize, writeMode, maxRowsPerFile) - Extract LanceIndexOptions (index-building: indexType, metricType, PQ/HNSW params) - Extract LanceVectorSearchOptions (vector-search: nprobes, ef, refineFactor) - Add toSourceOptions/toSinkOptions/toIndexOptions/toVectorSearchOptions to LanceOptions - All new classes are immutable with Builder pattern and validation P1-B: Introduce LanceSourceHandle immutable object - Carry push-down state (projection, filters, limit, aggregateInfo) as immutable snapshot - Defensive copies for arrays and unmodifiable lists - Add getHandle() and buildSourceOptions() to LanceDynamicTableSource Tests: 43 new tests covering all config classes, conversion, and handle immutability
Collaborator
Author
Supplementary Explanationthe commitId is : 9bf9dd1
|
…ogic P2-A: Unified LanceDatasetFactory - Extract LanceDatasetFactory utility in config package - Provides open(path, allocator) with path validation and error wrapping - Provides openManaged(path) with auto-managed allocator lifecycle - Provides createAllocator(), validatePath(), closeQuietly() helpers - ManagedDataset implements Closeable for try-with-resources pattern P2-B: Replace all 12 Dataset.open calls across 10 files - LanceSource, LanceAggregateSource, LanceInputFormat (legacy sources) - LanceSink (legacy sink) - LanceSinkWriter, LanceSourceReader, LanceSplitEnumerator (V2 runtime) - LanceIndexBuilder, LanceVectorSearch (utility classes) - LanceCatalog (3 occurrences) - Net reduction: 31 lines of duplicated boilerplate removed Tests: 12 new tests for LanceDatasetFactory covering: - Path validation (null, empty, valid) - Allocator creation and cleanup - Error handling for non-existent datasets - closeQuietly null-safety All 269 tests pass. Spotless + Checkstyle clean.
Collaborator
Author
Supplementary Explanationthe commitId is: 4a3ff7e
|
P3: Catalog Refactoring — Extract storage logic from LanceCatalog New catalog subpackage: org.apache.flink.connector.lance.catalog 1. LanceCatalogPathResolver - Immutable path resolver for warehouse/database/table paths - Handles path normalization (trailing slash removal) - Detects remote storage (S3, GCS, Azure, HTTP/HTTPS) - resolveDatabasePath(), resolveTablePath() 2. LanceStorageProvider (interface) - Storage abstraction layer with 10 operations - initializeWarehouse, listDatabases, databaseExists, createDatabase - dropDatabase, listTables, tableExists, dropTable, renameTable - registerTable, configureEnvironment 3. LocalStorageProvider (implements LanceStorageProvider) - Local filesystem operations using java.nio.file - Lance dataset detection via _versions directory - Recursive directory deletion 4. RemoteStorageProvider (implements LanceStorageProvider) - In-memory ConcurrentHashMap registries for databases/tables - Lazy dataset existence probing via LanceDatasetFactory - Delegates to StorageEnvironmentManager for S3 credentials 5. StorageEnvironmentManager - Centralized S3/cloud credential management - Maps internal storage keys to AWS system properties - Maps internal keys to table-level connector options (toTableOptions) LanceCatalog changes: - Replaced all if(isRemoteStorage) branches with StorageProvider delegation - Removed 7 private methods: configureStorageEnvironment, getDatabasePath, getDatasetPath, getStorageOptionsForTable, normalizeWarehousePath, isRemotePath, deleteDirectory - Uses LanceDatasetFactory.createAllocator() / closeQuietly() - Added getPathResolver() and getStorageProvider() for testability - Net reduction: ~200 lines of mixed local/remote branching logic Tests: 57 new tests across 4 test classes: - LanceCatalogPathResolverTest: 18 tests (normalization, remote detection, resolution) - StorageEnvironmentManagerTest: 7 tests (configure, toTableOptions) - LocalStorageProviderTest: 16 tests (init, db ops, table ops, no-ops) - RemoteStorageProviderTest: 16 tests (registry, env config, edge cases) All 326 tests pass. Spotless + Checkstyle clean.
Collaborator
Author
- LanceCatalog.createTable() now creates an empty Lance Dataset with the schema from CREATE TABLE DDL, persisting column info on disk - LanceCatalog.getTable() merges user-provided table options (e.g. write.batch-size, write.mode) back into the returned CatalogTable - LanceDynamicTableFactory: path is now optional (auto-injected by Catalog); declare S3 config options (s3-access-key, s3-secret-key, s3-region, s3-endpoint) as optional options - LanceSinkWriter: support remote storage (S3/GCS/Azure) path existence checking via Dataset.open() probe instead of local Files.exists() - Add 7 new integration tests covering CREATE TABLE lifecycle: dataset creation, user options preservation, duplicate detection, DROP TABLE, vector columns, custom databases, S3 options declaration All 326 tests pass. Spotless + Checkstyle clean.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.

Based on the trino-lance plugin, the source and sink APIs of Flink-lance have been refactored.
code architecture diagram
Implementation details
Migrate Source from RichParallelSourceFunction to Source V2 API (FLIP-27)
Migrate Sink from RichSinkFunction to Sink V2 API (FLIP-143)
Add comprehensive unit tests