From 0f69c58377ffb3bd79003ff54c3237dd2e4abbcc Mon Sep 17 00:00:00 2001 From: Toby Cole Date: Wed, 18 Mar 2026 10:50:49 -0700 Subject: [PATCH 1/3] [core] Support custom table paths in JdbcCatalog Enable users to specify custom storage locations for individual tables via CoreOptions.PATH, replicating the pattern already used by HiveCatalog. The catalog-level warehouse becomes a default, but individual tables can live at arbitrary paths persisted in paimon_table_properties. Key changes: - JdbcUtils: add getTableProperty() to fetch a single table property - JdbcCatalog: override allowCustomTablePath(), getTableLocation() - createTableImpl: use initialTableLocation(), store path for custom tables - dropTableImpl: skip filesystem deletion for custom-path tables - renameTableImpl: skip filesystem rename for custom-path tables - alterTableImpl: preserve custom path across property refresh - dropDatabaseImpl/repairTable: always manage table properties Co-Authored-By: Claude Opus 4.6 --- .../org/apache/paimon/jdbc/JdbcCatalog.java | 193 ++++++++++++---- .../org/apache/paimon/jdbc/JdbcUtils.java | 51 +++++ .../apache/paimon/jdbc/JdbcCatalogTest.java | 212 ++++++++++++++++++ 3 files changed, 407 insertions(+), 49 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index f765e5f88db5..bb603ae432ea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -232,14 +232,8 @@ protected void dropDatabaseImpl(String name) { execute(connections, JdbcUtils.DELETE_TABLES_SQL, catalogKey, name); // Delete properties from paimon_database_properties execute(connections, JdbcUtils.DELETE_ALL_DATABASE_PROPERTIES_SQL, catalogKey, name); - // Delete table properties from paimon_table_properties - if (syncTableProperties()) { - execute( - connections, - JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_FOR_DB_SQL, - catalogKey, - name); - } + // Always delete table properties (needed for custom path entries) + execute(connections, JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_FOR_DB_SQL, catalogKey, name); } @Override @@ -293,6 +287,10 @@ protected List listTablesImpl(String databaseName) { @Override protected void dropTableImpl(Identifier identifier, List externalPaths) { try { + // Retrieve table location and custom-path flag BEFORE deleting JDBC metadata + Path path = getTableLocation(identifier); + boolean customPath = isCustomTablePath(identifier); + int deletedRecords = execute( connections, @@ -305,15 +303,20 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { LOG.info("Skipping drop, table does not exist: {}", identifier); return; } - if (syncTableProperties()) { - execute( - connections, - JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName()); + + // Always delete table properties (needed for custom path entries) + execute( + connections, + JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName()); + + // If custom path: skip filesystem deletion (external table keeps its data) + if (customPath) { + return; } - Path path = getTableLocation(identifier); + try { if (fileIO.exists(path)) { fileIO.deleteDirectoryQuietly(path); @@ -334,12 +337,16 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { @Override protected void createTableImpl(Identifier identifier, Schema schema) { try { - // create table file - SchemaManager schemaManager = getSchemaManager(identifier); + // Determine table location before table exists in JDBC + Path tableLocation = initialTableLocation(schema.options(), identifier); + boolean externalTable = schema.options().containsKey(CoreOptions.PATH.key()); + + // create table file using the determined location + SchemaManager schemaManager = new SchemaManager(fileIO, tableLocation); TableSchema tableSchema = - runWithLock(identifier, () -> schemaManager.createTable(schema)); - // Update schema metadata - Path path = getTableLocation(identifier); + runWithLock(identifier, () -> schemaManager.createTable(schema, externalTable)); + + // Insert table record into paimon_tables if (JdbcUtils.insertTable( connections, catalogKey, @@ -348,22 +355,40 @@ protected void createTableImpl(Identifier identifier, Schema schema) { LOG.debug("Successfully committed to new table: {}", identifier); } else { try { - fileIO.deleteDirectoryQuietly(path); + if (!externalTable) { + fileIO.deleteDirectoryQuietly(tableLocation); + } } catch (Exception ee) { - LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee); + LOG.error( + "Delete directory[{}] fail for table {}", + tableLocation, + identifier, + ee); } throw new RuntimeException( String.format( "Failed to create table %s in catalog %s", identifier.getFullName(), catalogKey)); } + + // Always store path property for custom-path tables + Map propsToStore = new HashMap<>(); + if (externalTable) { + propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); + } if (syncTableProperties()) { + Map tableProps = collectTableProperties(tableSchema); + // Avoid duplicate path entry + tableProps.remove(CoreOptions.PATH.key()); + propsToStore.putAll(tableProps); + } + if (!propsToStore.isEmpty()) { JdbcUtils.insertTableProperties( connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName(), - collectTableProperties(tableSchema)); + propsToStore); } } catch (Exception e) { throw new RuntimeException("Failed to create table " + identifier.getFullName(), e); @@ -373,24 +398,32 @@ protected void createTableImpl(Identifier identifier, Schema schema) { @Override protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { + // Check custom path BEFORE renaming metadata + boolean customPath = isCustomTablePath(fromTable); + Path fromPath = getTableLocation(fromTable); + // update table metadata info updateTable(connections, catalogKey, fromTable, toTable); - if (syncTableProperties()) { - execute( - connections, - JdbcUtils.RENAME_TABLE_PROPERTIES_SQL, - toTable.getDatabaseName(), - toTable.getObjectName(), - catalogKey, - fromTable.getDatabaseName(), - fromTable.getObjectName()); + + // Always update table properties (needed for custom path entries) + execute( + connections, + JdbcUtils.RENAME_TABLE_PROPERTIES_SQL, + toTable.getDatabaseName(), + toTable.getObjectName(), + catalogKey, + fromTable.getDatabaseName(), + fromTable.getObjectName()); + + // If custom path: skip filesystem rename (data stays at same location) + if (customPath) { + return; } - Path fromPath = getTableLocation(fromTable); if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) { // Rename the file system's table directory. Maintain consistency between tables in // the file system and tables in the Hive Metastore. - Path toPath = getTableLocation(toTable); + Path toPath = super.getTableLocation(toTable); try { fileIO.rename(fromPath, toPath); } catch (IOException e) { @@ -414,6 +447,15 @@ protected void alterTableImpl(Identifier identifier, List changes) try { runWithLock(identifier, () -> schemaManager.commitChanges(changes)); if (syncTableProperties()) { + // Save custom path before DELETE_ALL so we can re-insert it + Optional customPath = + JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + TableSchema updatedSchema = schemaManager.latest().get(); execute( connections, @@ -421,12 +463,18 @@ protected void alterTableImpl(Identifier identifier, List changes) catalogKey, identifier.getDatabaseName(), identifier.getTableName()); - JdbcUtils.insertTableProperties( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName(), - collectTableProperties(updatedSchema)); + + Map propsToStore = collectTableProperties(updatedSchema); + // Re-insert custom path if it was stored + customPath.ifPresent(p -> propsToStore.put(CoreOptions.PATH.key(), p)); + if (!propsToStore.isEmpty()) { + JdbcUtils.insertTableProperties( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + propsToStore); + } } } catch (TableNotExistException | ColumnAlreadyExistException @@ -451,6 +499,23 @@ protected TableSchema loadTableSchema(Identifier identifier) throws TableNotExis () -> new RuntimeException("There is no paimon table in " + tableLocation)); } + @Override + protected boolean allowCustomTablePath() { + return true; + } + + @Override + public Path getTableLocation(Identifier identifier) { + Optional storedPath = + JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + return storedPath.map(Path::new).orElse(super.getTableLocation(identifier)); + } + @Override public boolean caseSensitive() { return false; @@ -550,20 +615,33 @@ public void repairTable(Identifier identifier) throws TableNotExistException { LOG.error("Failed to repair table: {}", identifier); } } + + // Always clean and re-insert table properties during repair + execute( + connections, + JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName()); + + Map propsToStore = new HashMap<>(); + // Check if the table has a custom path (path from schema != default location) + Path defaultLocation = super.getTableLocation(identifier); + if (!tableLocation.equals(defaultLocation)) { + propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); + } if (syncTableProperties()) { - // Delete existing properties and reinsert from filesystem schema - execute( - connections, - JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName()); + Map tableProps = collectTableProperties(tableSchema); + tableProps.remove(CoreOptions.PATH.key()); + propsToStore.putAll(tableProps); + } + if (!propsToStore.isEmpty()) { JdbcUtils.insertTableProperties( connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName(), - collectTableProperties(tableSchema)); + propsToStore); } } @@ -599,6 +677,23 @@ private Map collectTableProperties(TableSchema tableSchema) { return properties; } + private Path initialTableLocation(Map tableOptions, Identifier identifier) { + if (tableOptions.containsKey(CoreOptions.PATH.key())) { + return new Path(tableOptions.get(CoreOptions.PATH.key())); + } + return super.getTableLocation(identifier); + } + + private boolean isCustomTablePath(Identifier identifier) { + return JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()) + .isPresent(); + } + private SchemaManager getSchemaManager(Identifier identifier) { return new SchemaManager(fileIO, getTableLocation(identifier)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index 52cf4224f2f7..805aa43d85a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -32,6 +32,7 @@ import java.sql.SQLIntegrityConstraintViolationException; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.function.Consumer; @@ -325,6 +326,21 @@ public class JdbcUtils { + TABLE_NAME + " = ? "; + static final String GET_TABLE_PROPERTY_SQL = + "SELECT " + + TABLE_PROPERTY_VALUE + + " FROM " + + TABLE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + TABLE_DATABASE + + " = ? AND " + + TABLE_NAME + + " = ? AND " + + TABLE_PROPERTY_KEY + + " = ?"; + // Distributed locks table static final String DISTRIBUTED_LOCKS_TABLE_NAME = "paimon_distributed_locks"; static final String LOCK_ID = "lock_id"; @@ -554,6 +570,41 @@ public static boolean insertTableProperties( insertedRecords, properties.size())); } + @SuppressWarnings("checkstyle:NestedTryDepth") + public static Optional getTableProperty( + JdbcClientPool connections, + String storeKey, + String databaseName, + String tableName, + String propertyKey) { + try { + return connections.run( + conn -> { + try (PreparedStatement ps = conn.prepareStatement(GET_TABLE_PROPERTY_SQL)) { + ps.setString(1, storeKey); + ps.setString(2, databaseName); + ps.setString(3, tableName); + ps.setString(4, propertyKey); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return Optional.of(rs.getString(TABLE_PROPERTY_VALUE)); + } + } + } + return Optional.empty(); + }); + } catch (SQLException e) { + throw new RuntimeException( + String.format( + "Failed to get table property '%s' for %s.%s", + propertyKey, databaseName, tableName), + e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in SQL query", e); + } + } + private static String insertTablePropertiesStatement(int size) { StringBuilder sqlStatement = new StringBuilder(JdbcUtils.INSERT_TABLE_PROPERTIES_SQL); for (int i = 0; i < size; i++) { diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index 2b121dd4cdf7..d1b16f459b46 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -18,14 +18,17 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.Table; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -619,4 +622,213 @@ public void testInsertTableUtility() throws Exception { .isInstanceOf(RuntimeException.class) .hasMessageContaining("Failed to insert table"); } + + private Schema schemaWithCustomPath(String customPath) { + Map options = new HashMap<>(); + options.put(CoreOptions.PATH.key(), customPath); + return new Schema( + Lists.newArrayList( + new DataField(0, "pk", DataTypes.INT()), + new DataField(1, "col1", DataTypes.STRING())), + Collections.emptyList(), + Collections.emptyList(), + options, + ""); + } + + @Test + public void testCreateTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_location/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "custom_table"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Verify schema exists at custom location + Path customPath = new Path(customDir); + SchemaManager sm = new SchemaManager(fileIO, customPath); + assertThat(sm.listAllIds()).isNotEmpty(); + + // Verify getTableLocation returns the custom path + assertThat(jdbcCatalog.getTableLocation(identifier)).isEqualTo(customPath); + + // Verify path is stored in JDBC table properties + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "custom_table"); + assertThat(storedProps).containsEntry(CoreOptions.PATH.key(), customPath.toString()); + + // Verify table is loadable + assertDoesNotThrow(() -> jdbcCatalog.getTable(identifier)); + } + + @Test + public void testCreateTableWithCustomPathSyncDisabled() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(false); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_nosync/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "nosync_table"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Path should still be stored even when sync is disabled + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "nosync_table"); + assertThat(storedProps) + .containsEntry(CoreOptions.PATH.key(), new Path(customDir).toString()); + + // Other properties should NOT be stored (sync disabled) + assertThat(storedProps).hasSize(1); + } + + @Test + public void testDropTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_drop/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "drop_custom"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Verify data exists at custom location + Path customPath = new Path(customDir); + assertThat(fileIO.exists(customPath)).isTrue(); + + // Drop the table + jdbcCatalog.dropTable(identifier, false); + + // Verify JDBC metadata is cleaned up + assertThatThrownBy(() -> jdbcCatalog.getTable(identifier)) + .isInstanceOf(Catalog.TableNotExistException.class); + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "drop_custom"); + assertThat(storedProps).isEmpty(); + + // Verify data is NOT deleted (external table keeps its data) + assertThat(fileIO.exists(customPath)).isTrue(); + } + + @Test + public void testDropTableWithDefaultPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + Identifier identifier = Identifier.create("test_db", "drop_default"); + jdbcCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + + Path tablePath = jdbcCatalog.getTableLocation(identifier); + assertThat(fileIO.exists(tablePath)).isTrue(); + + jdbcCatalog.dropTable(identifier, false); + + // Data SHOULD be deleted for default-path tables + assertThat(fileIO.exists(tablePath)).isFalse(); + } + + @Test + public void testRenameTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_rename/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier fromTable = Identifier.create("test_db", "rename_from"); + + jdbcCatalog.createTable(fromTable, schema, false); + + Identifier toTable = Identifier.create("test_db", "rename_to"); + jdbcCatalog.renameTable(fromTable, toTable, false); + + // Verify old table is gone + assertThatThrownBy(() -> jdbcCatalog.getTable(fromTable)) + .isInstanceOf(Catalog.TableNotExistException.class); + + // Verify new table is accessible and still points to the custom path + assertThat(jdbcCatalog.getTableLocation(toTable)).isEqualTo(new Path(customDir)); + + // Verify the path property was moved + Map storedProps = fetchTableProperties(jdbcCatalog, "test_db", "rename_to"); + assertThat(storedProps) + .containsEntry(CoreOptions.PATH.key(), new Path(customDir).toString()); + + // Verify old name has no properties + Map oldProps = fetchTableProperties(jdbcCatalog, "test_db", "rename_from"); + assertThat(oldProps).isEmpty(); + } + + @Test + public void testAlterTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_alter/my_table"; + Map options = new HashMap<>(); + options.put(CoreOptions.PATH.key(), customDir); + options.put("bucket", "4"); + Schema schema = + new Schema( + Lists.newArrayList( + new DataField(0, "pk", DataTypes.INT()), + new DataField(1, "col1", DataTypes.STRING())), + Collections.emptyList(), + Lists.newArrayList("pk"), + options, + ""); + + Identifier identifier = Identifier.create("test_db", "alter_custom"); + jdbcCatalog.createTable(identifier, schema, false); + + // Alter: add a new option + jdbcCatalog.alterTable( + identifier, + Lists.newArrayList(SchemaChange.setOption("file.format", "orc")), + false); + + // Verify path is preserved after alter + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "alter_custom"); + assertThat(storedProps) + .containsEntry(CoreOptions.PATH.key(), new Path(customDir).toString()); + assertThat(storedProps).containsEntry("file.format", "orc"); + assertThat(storedProps).containsEntry("bucket", "4"); + + // Verify getTableLocation still returns custom path + assertThat(jdbcCatalog.getTableLocation(identifier)).isEqualTo(new Path(customDir)); + } + + @Test + public void testLoadTableSchemaWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(false); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_load/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "load_custom"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Verify schema loads from custom location + Table table = jdbcCatalog.getTable(identifier); + assertThat(table).isNotNull(); + assertThat(table.name()).isEqualTo("load_custom"); + } + + @Test + public void testGetTableLocationFallback() throws Exception { + JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog; + jdbcCatalog.createDatabase("test_db", false); + + Identifier identifier = Identifier.create("test_db", "default_table"); + jdbcCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + + // Verify getTableLocation returns default when no stored path + Path expected = new Path(new Path(warehouse, "test_db.db"), "default_table"); + assertThat(jdbcCatalog.getTableLocation(identifier)).isEqualTo(expected); + } } From 35b9e81c1f045d1e7923588f3d785dfb3e028af5 Mon Sep 17 00:00:00 2001 From: Toby Cole Date: Wed, 18 Mar 2026 11:09:18 -0700 Subject: [PATCH 2/3] [core] Gate custom table path behind syncTableProperties Custom table path support is now only available when syncTableProperties is enabled, preserving the existing default behavior when sync is off. Co-Authored-By: Claude Opus 4.6 --- .../org/apache/paimon/jdbc/JdbcCatalog.java | 129 +++++++++--------- .../apache/paimon/jdbc/JdbcCatalogTest.java | 15 +- 2 files changed, 71 insertions(+), 73 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index bb603ae432ea..b7ad27ebfb50 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -232,8 +232,14 @@ protected void dropDatabaseImpl(String name) { execute(connections, JdbcUtils.DELETE_TABLES_SQL, catalogKey, name); // Delete properties from paimon_database_properties execute(connections, JdbcUtils.DELETE_ALL_DATABASE_PROPERTIES_SQL, catalogKey, name); - // Always delete table properties (needed for custom path entries) - execute(connections, JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_FOR_DB_SQL, catalogKey, name); + // Delete table properties from paimon_table_properties + if (syncTableProperties()) { + execute( + connections, + JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_FOR_DB_SQL, + catalogKey, + name); + } } @Override @@ -289,7 +295,7 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { try { // Retrieve table location and custom-path flag BEFORE deleting JDBC metadata Path path = getTableLocation(identifier); - boolean customPath = isCustomTablePath(identifier); + boolean customPath = syncTableProperties() && isCustomTablePath(identifier); int deletedRecords = execute( @@ -303,14 +309,14 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { LOG.info("Skipping drop, table does not exist: {}", identifier); return; } - - // Always delete table properties (needed for custom path entries) - execute( - connections, - JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName()); + if (syncTableProperties()) { + execute( + connections, + JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName()); + } // If custom path: skip filesystem deletion (external table keeps its data) if (customPath) { @@ -339,7 +345,8 @@ protected void createTableImpl(Identifier identifier, Schema schema) { try { // Determine table location before table exists in JDBC Path tableLocation = initialTableLocation(schema.options(), identifier); - boolean externalTable = schema.options().containsKey(CoreOptions.PATH.key()); + boolean externalTable = + syncTableProperties() && schema.options().containsKey(CoreOptions.PATH.key()); // create table file using the determined location SchemaManager schemaManager = new SchemaManager(fileIO, tableLocation); @@ -371,18 +378,14 @@ protected void createTableImpl(Identifier identifier, Schema schema) { identifier.getFullName(), catalogKey)); } - // Always store path property for custom-path tables - Map propsToStore = new HashMap<>(); - if (externalTable) { - propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); - } if (syncTableProperties()) { - Map tableProps = collectTableProperties(tableSchema); - // Avoid duplicate path entry - tableProps.remove(CoreOptions.PATH.key()); - propsToStore.putAll(tableProps); - } - if (!propsToStore.isEmpty()) { + Map propsToStore = collectTableProperties(tableSchema); + // Store custom path in table properties + if (externalTable) { + propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); + } else { + propsToStore.remove(CoreOptions.PATH.key()); + } JdbcUtils.insertTableProperties( connections, catalogKey, @@ -399,21 +402,21 @@ protected void createTableImpl(Identifier identifier, Schema schema) { protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { // Check custom path BEFORE renaming metadata - boolean customPath = isCustomTablePath(fromTable); + boolean customPath = syncTableProperties() && isCustomTablePath(fromTable); Path fromPath = getTableLocation(fromTable); // update table metadata info updateTable(connections, catalogKey, fromTable, toTable); - - // Always update table properties (needed for custom path entries) - execute( - connections, - JdbcUtils.RENAME_TABLE_PROPERTIES_SQL, - toTable.getDatabaseName(), - toTable.getObjectName(), - catalogKey, - fromTable.getDatabaseName(), - fromTable.getObjectName()); + if (syncTableProperties()) { + execute( + connections, + JdbcUtils.RENAME_TABLE_PROPERTIES_SQL, + toTable.getDatabaseName(), + toTable.getObjectName(), + catalogKey, + fromTable.getDatabaseName(), + fromTable.getObjectName()); + } // If custom path: skip filesystem rename (data stays at same location) if (customPath) { @@ -501,19 +504,24 @@ protected TableSchema loadTableSchema(Identifier identifier) throws TableNotExis @Override protected boolean allowCustomTablePath() { - return true; + return syncTableProperties(); } @Override public Path getTableLocation(Identifier identifier) { - Optional storedPath = - JdbcUtils.getTableProperty( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName(), - CoreOptions.PATH.key()); - return storedPath.map(Path::new).orElse(super.getTableLocation(identifier)); + if (syncTableProperties()) { + Optional storedPath = + JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + if (storedPath.isPresent()) { + return new Path(storedPath.get()); + } + } + return super.getTableLocation(identifier); } @Override @@ -616,26 +624,23 @@ public void repairTable(Identifier identifier) throws TableNotExistException { } } - // Always clean and re-insert table properties during repair - execute( - connections, - JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName()); - - Map propsToStore = new HashMap<>(); - // Check if the table has a custom path (path from schema != default location) - Path defaultLocation = super.getTableLocation(identifier); - if (!tableLocation.equals(defaultLocation)) { - propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); - } if (syncTableProperties()) { - Map tableProps = collectTableProperties(tableSchema); - tableProps.remove(CoreOptions.PATH.key()); - propsToStore.putAll(tableProps); - } - if (!propsToStore.isEmpty()) { + // Delete existing properties and reinsert from filesystem schema + execute( + connections, + JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName()); + + Map propsToStore = collectTableProperties(tableSchema); + // Check if the table has a custom path (path from schema != default location) + Path defaultLocation = super.getTableLocation(identifier); + if (!tableLocation.equals(defaultLocation)) { + propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); + } else { + propsToStore.remove(CoreOptions.PATH.key()); + } JdbcUtils.insertTableProperties( connections, catalogKey, diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index d1b16f459b46..6a785b9c0c8f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -673,16 +673,9 @@ public void testCreateTableWithCustomPathSyncDisabled() throws Exception { Schema schema = schemaWithCustomPath(customDir); Identifier identifier = Identifier.create("test_db", "nosync_table"); - jdbcCatalog.createTable(identifier, schema, false); - - // Path should still be stored even when sync is disabled - Map storedProps = - fetchTableProperties(jdbcCatalog, "test_db", "nosync_table"); - assertThat(storedProps) - .containsEntry(CoreOptions.PATH.key(), new Path(customDir).toString()); - - // Other properties should NOT be stored (sync disabled) - assertThat(storedProps).hasSize(1); + // Custom path requires syncTableProperties=true + assertThatThrownBy(() -> jdbcCatalog.createTable(identifier, schema, false)) + .isInstanceOf(UnsupportedOperationException.class); } @Test @@ -804,7 +797,7 @@ public void testAlterTableWithCustomPath() throws Exception { @Test public void testLoadTableSchemaWithCustomPath() throws Exception { - JdbcCatalog jdbcCatalog = initCatalogWithSync(false); + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); jdbcCatalog.createDatabase("test_db", false); String customDir = warehouse + "/custom_load/my_table"; From 5cadd0c84c9891c4b231b7f776c50b6d06ab8906 Mon Sep 17 00:00:00 2001 From: Toby Cole Date: Thu, 19 Mar 2026 04:03:37 -0700 Subject: [PATCH 3/3] [core] Fix redundant JDBC queries and document repair limitation - Consolidate getTableLocation + isCustomTablePath into a single fetchStoredPathIfSyncEnabled call in dropTableImpl and renameTableImpl - Document that repairTable cannot recover custom-path tables when all JDBC metadata is lost - Fix stale "Hive Metastore" comment in renameTableImpl Co-Authored-By: Claude Opus 4.6 --- .../org/apache/paimon/jdbc/JdbcCatalog.java | 39 ++++++++++++------- 1 file changed, 26 insertions(+), 13 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index b7ad27ebfb50..a1ff6714c7f5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -294,8 +294,9 @@ protected List listTablesImpl(String databaseName) { protected void dropTableImpl(Identifier identifier, List externalPaths) { try { // Retrieve table location and custom-path flag BEFORE deleting JDBC metadata - Path path = getTableLocation(identifier); - boolean customPath = syncTableProperties() && isCustomTablePath(identifier); + Optional storedPath = fetchStoredPathIfSyncEnabled(identifier); + Path path = storedPath.map(Path::new).orElse(super.getTableLocation(identifier)); + boolean customPath = storedPath.isPresent(); int deletedRecords = execute( @@ -402,8 +403,9 @@ protected void createTableImpl(Identifier identifier, Schema schema) { protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { // Check custom path BEFORE renaming metadata - boolean customPath = syncTableProperties() && isCustomTablePath(fromTable); - Path fromPath = getTableLocation(fromTable); + Optional storedPath = fetchStoredPathIfSyncEnabled(fromTable); + boolean customPath = storedPath.isPresent(); + Path fromPath = storedPath.map(Path::new).orElse(super.getTableLocation(fromTable)); // update table metadata info updateTable(connections, catalogKey, fromTable, toTable); @@ -425,7 +427,7 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) { // Rename the file system's table directory. Maintain consistency between tables in - // the file system and tables in the Hive Metastore. + // the file system and tables in the JDBC catalog. Path toPath = super.getTableLocation(toTable); try { fileIO.rename(fromPath, toPath); @@ -595,6 +597,15 @@ public void repairDatabase(String databaseName) { } } + /** + * Repair a table by re-syncing JDBC metadata from the filesystem schema. + * + *

Note: Tables created with a custom path ({@code CoreOptions.PATH}) cannot be fully + * repaired if both the {@code paimon_tables} row and the {@code paimon_table_properties} rows + * are lost. In that case, {@code getTableLocation} falls back to the default warehouse path + * where no schema exists, and repair will throw {@link TableNotExistException}. To recover, + * re-create the table pointing to the original custom path. + */ @Override public void repairTable(Identifier identifier) throws TableNotExistException { checkNotBranch(identifier, "repairTable"); @@ -689,14 +700,16 @@ private Path initialTableLocation(Map tableOptions, Identifier i return super.getTableLocation(identifier); } - private boolean isCustomTablePath(Identifier identifier) { - return JdbcUtils.getTableProperty( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName(), - CoreOptions.PATH.key()) - .isPresent(); + private Optional fetchStoredPathIfSyncEnabled(Identifier identifier) { + if (syncTableProperties()) { + return JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + } + return Optional.empty(); } private SchemaManager getSchemaManager(Identifier identifier) {