diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index f765e5f88db5..a1ff6714c7f5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -293,6 +293,11 @@ protected List listTablesImpl(String databaseName) { @Override protected void dropTableImpl(Identifier identifier, List externalPaths) { try { + // Retrieve table location and custom-path flag BEFORE deleting JDBC metadata + Optional storedPath = fetchStoredPathIfSyncEnabled(identifier); + Path path = storedPath.map(Path::new).orElse(super.getTableLocation(identifier)); + boolean customPath = storedPath.isPresent(); + int deletedRecords = execute( connections, @@ -313,7 +318,12 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { identifier.getDatabaseName(), identifier.getTableName()); } - Path path = getTableLocation(identifier); + + // If custom path: skip filesystem deletion (external table keeps its data) + if (customPath) { + return; + } + try { if (fileIO.exists(path)) { fileIO.deleteDirectoryQuietly(path); @@ -334,12 +344,17 @@ protected void dropTableImpl(Identifier identifier, List externalPaths) { @Override protected void createTableImpl(Identifier identifier, Schema schema) { try { - // create table file - SchemaManager schemaManager = getSchemaManager(identifier); + // Determine table location before table exists in JDBC + Path tableLocation = initialTableLocation(schema.options(), identifier); + boolean externalTable = + syncTableProperties() && schema.options().containsKey(CoreOptions.PATH.key()); + + // create table file using the determined location + SchemaManager schemaManager = new SchemaManager(fileIO, tableLocation); TableSchema tableSchema = - runWithLock(identifier, () -> schemaManager.createTable(schema)); - // Update schema metadata - Path path = getTableLocation(identifier); + runWithLock(identifier, () -> schemaManager.createTable(schema, externalTable)); + + // Insert table record into paimon_tables if (JdbcUtils.insertTable( connections, catalogKey, @@ -348,22 +363,36 @@ protected void createTableImpl(Identifier identifier, Schema schema) { LOG.debug("Successfully committed to new table: {}", identifier); } else { try { - fileIO.deleteDirectoryQuietly(path); + if (!externalTable) { + fileIO.deleteDirectoryQuietly(tableLocation); + } } catch (Exception ee) { - LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee); + LOG.error( + "Delete directory[{}] fail for table {}", + tableLocation, + identifier, + ee); } throw new RuntimeException( String.format( "Failed to create table %s in catalog %s", identifier.getFullName(), catalogKey)); } + if (syncTableProperties()) { + Map propsToStore = collectTableProperties(tableSchema); + // Store custom path in table properties + if (externalTable) { + propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); + } else { + propsToStore.remove(CoreOptions.PATH.key()); + } JdbcUtils.insertTableProperties( connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName(), - collectTableProperties(tableSchema)); + propsToStore); } } catch (Exception e) { throw new RuntimeException("Failed to create table " + identifier.getFullName(), e); @@ -373,6 +402,11 @@ protected void createTableImpl(Identifier identifier, Schema schema) { @Override protected void renameTableImpl(Identifier fromTable, Identifier toTable) { try { + // Check custom path BEFORE renaming metadata + Optional storedPath = fetchStoredPathIfSyncEnabled(fromTable); + boolean customPath = storedPath.isPresent(); + Path fromPath = storedPath.map(Path::new).orElse(super.getTableLocation(fromTable)); + // update table metadata info updateTable(connections, catalogKey, fromTable, toTable); if (syncTableProperties()) { @@ -386,11 +420,15 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) { fromTable.getObjectName()); } - Path fromPath = getTableLocation(fromTable); + // If custom path: skip filesystem rename (data stays at same location) + if (customPath) { + return; + } + if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) { // Rename the file system's table directory. Maintain consistency between tables in - // the file system and tables in the Hive Metastore. - Path toPath = getTableLocation(toTable); + // the file system and tables in the JDBC catalog. + Path toPath = super.getTableLocation(toTable); try { fileIO.rename(fromPath, toPath); } catch (IOException e) { @@ -414,6 +452,15 @@ protected void alterTableImpl(Identifier identifier, List changes) try { runWithLock(identifier, () -> schemaManager.commitChanges(changes)); if (syncTableProperties()) { + // Save custom path before DELETE_ALL so we can re-insert it + Optional customPath = + JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + TableSchema updatedSchema = schemaManager.latest().get(); execute( connections, @@ -421,12 +468,18 @@ protected void alterTableImpl(Identifier identifier, List changes) catalogKey, identifier.getDatabaseName(), identifier.getTableName()); - JdbcUtils.insertTableProperties( - connections, - catalogKey, - identifier.getDatabaseName(), - identifier.getTableName(), - collectTableProperties(updatedSchema)); + + Map propsToStore = collectTableProperties(updatedSchema); + // Re-insert custom path if it was stored + customPath.ifPresent(p -> propsToStore.put(CoreOptions.PATH.key(), p)); + if (!propsToStore.isEmpty()) { + JdbcUtils.insertTableProperties( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + propsToStore); + } } } catch (TableNotExistException | ColumnAlreadyExistException @@ -451,6 +504,28 @@ protected TableSchema loadTableSchema(Identifier identifier) throws TableNotExis () -> new RuntimeException("There is no paimon table in " + tableLocation)); } + @Override + protected boolean allowCustomTablePath() { + return syncTableProperties(); + } + + @Override + public Path getTableLocation(Identifier identifier) { + if (syncTableProperties()) { + Optional storedPath = + JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + if (storedPath.isPresent()) { + return new Path(storedPath.get()); + } + } + return super.getTableLocation(identifier); + } + @Override public boolean caseSensitive() { return false; @@ -522,6 +597,15 @@ public void repairDatabase(String databaseName) { } } + /** + * Repair a table by re-syncing JDBC metadata from the filesystem schema. + * + *

Note: Tables created with a custom path ({@code CoreOptions.PATH}) cannot be fully + * repaired if both the {@code paimon_tables} row and the {@code paimon_table_properties} rows + * are lost. In that case, {@code getTableLocation} falls back to the default warehouse path + * where no schema exists, and repair will throw {@link TableNotExistException}. To recover, + * re-create the table pointing to the original custom path. + */ @Override public void repairTable(Identifier identifier) throws TableNotExistException { checkNotBranch(identifier, "repairTable"); @@ -550,6 +634,7 @@ public void repairTable(Identifier identifier) throws TableNotExistException { LOG.error("Failed to repair table: {}", identifier); } } + if (syncTableProperties()) { // Delete existing properties and reinsert from filesystem schema execute( @@ -558,12 +643,21 @@ public void repairTable(Identifier identifier) throws TableNotExistException { catalogKey, identifier.getDatabaseName(), identifier.getTableName()); + + Map propsToStore = collectTableProperties(tableSchema); + // Check if the table has a custom path (path from schema != default location) + Path defaultLocation = super.getTableLocation(identifier); + if (!tableLocation.equals(defaultLocation)) { + propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString()); + } else { + propsToStore.remove(CoreOptions.PATH.key()); + } JdbcUtils.insertTableProperties( connections, catalogKey, identifier.getDatabaseName(), identifier.getTableName(), - collectTableProperties(tableSchema)); + propsToStore); } } @@ -599,6 +693,25 @@ private Map collectTableProperties(TableSchema tableSchema) { return properties; } + private Path initialTableLocation(Map tableOptions, Identifier identifier) { + if (tableOptions.containsKey(CoreOptions.PATH.key())) { + return new Path(tableOptions.get(CoreOptions.PATH.key())); + } + return super.getTableLocation(identifier); + } + + private Optional fetchStoredPathIfSyncEnabled(Identifier identifier) { + if (syncTableProperties()) { + return JdbcUtils.getTableProperty( + connections, + catalogKey, + identifier.getDatabaseName(), + identifier.getTableName(), + CoreOptions.PATH.key()); + } + return Optional.empty(); + } + private SchemaManager getSchemaManager(Identifier identifier) { return new SchemaManager(fileIO, getTableLocation(identifier)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java index 52cf4224f2f7..805aa43d85a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java @@ -32,6 +32,7 @@ import java.sql.SQLIntegrityConstraintViolationException; import java.util.Collections; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.function.Consumer; @@ -325,6 +326,21 @@ public class JdbcUtils { + TABLE_NAME + " = ? "; + static final String GET_TABLE_PROPERTY_SQL = + "SELECT " + + TABLE_PROPERTY_VALUE + + " FROM " + + TABLE_PROPERTIES_TABLE_NAME + + " WHERE " + + CATALOG_KEY + + " = ? AND " + + TABLE_DATABASE + + " = ? AND " + + TABLE_NAME + + " = ? AND " + + TABLE_PROPERTY_KEY + + " = ?"; + // Distributed locks table static final String DISTRIBUTED_LOCKS_TABLE_NAME = "paimon_distributed_locks"; static final String LOCK_ID = "lock_id"; @@ -554,6 +570,41 @@ public static boolean insertTableProperties( insertedRecords, properties.size())); } + @SuppressWarnings("checkstyle:NestedTryDepth") + public static Optional getTableProperty( + JdbcClientPool connections, + String storeKey, + String databaseName, + String tableName, + String propertyKey) { + try { + return connections.run( + conn -> { + try (PreparedStatement ps = conn.prepareStatement(GET_TABLE_PROPERTY_SQL)) { + ps.setString(1, storeKey); + ps.setString(2, databaseName); + ps.setString(3, tableName); + ps.setString(4, propertyKey); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return Optional.of(rs.getString(TABLE_PROPERTY_VALUE)); + } + } + } + return Optional.empty(); + }); + } catch (SQLException e) { + throw new RuntimeException( + String.format( + "Failed to get table property '%s' for %s.%s", + propertyKey, databaseName, tableName), + e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted in SQL query", e); + } + } + private static String insertTablePropertiesStatement(int size) { StringBuilder sqlStatement = new StringBuilder(JdbcUtils.INSERT_TABLE_PROPERTIES_SQL); for (int i = 0; i < size; i++) { diff --git a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java index 2b121dd4cdf7..6a785b9c0c8f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java @@ -18,14 +18,17 @@ package org.apache.paimon.jdbc; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogTestBase; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaChange; +import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.table.Table; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypes; @@ -619,4 +622,206 @@ public void testInsertTableUtility() throws Exception { .isInstanceOf(RuntimeException.class) .hasMessageContaining("Failed to insert table"); } + + private Schema schemaWithCustomPath(String customPath) { + Map options = new HashMap<>(); + options.put(CoreOptions.PATH.key(), customPath); + return new Schema( + Lists.newArrayList( + new DataField(0, "pk", DataTypes.INT()), + new DataField(1, "col1", DataTypes.STRING())), + Collections.emptyList(), + Collections.emptyList(), + options, + ""); + } + + @Test + public void testCreateTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_location/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "custom_table"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Verify schema exists at custom location + Path customPath = new Path(customDir); + SchemaManager sm = new SchemaManager(fileIO, customPath); + assertThat(sm.listAllIds()).isNotEmpty(); + + // Verify getTableLocation returns the custom path + assertThat(jdbcCatalog.getTableLocation(identifier)).isEqualTo(customPath); + + // Verify path is stored in JDBC table properties + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "custom_table"); + assertThat(storedProps).containsEntry(CoreOptions.PATH.key(), customPath.toString()); + + // Verify table is loadable + assertDoesNotThrow(() -> jdbcCatalog.getTable(identifier)); + } + + @Test + public void testCreateTableWithCustomPathSyncDisabled() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(false); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_nosync/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "nosync_table"); + + // Custom path requires syncTableProperties=true + assertThatThrownBy(() -> jdbcCatalog.createTable(identifier, schema, false)) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + public void testDropTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_drop/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "drop_custom"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Verify data exists at custom location + Path customPath = new Path(customDir); + assertThat(fileIO.exists(customPath)).isTrue(); + + // Drop the table + jdbcCatalog.dropTable(identifier, false); + + // Verify JDBC metadata is cleaned up + assertThatThrownBy(() -> jdbcCatalog.getTable(identifier)) + .isInstanceOf(Catalog.TableNotExistException.class); + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "drop_custom"); + assertThat(storedProps).isEmpty(); + + // Verify data is NOT deleted (external table keeps its data) + assertThat(fileIO.exists(customPath)).isTrue(); + } + + @Test + public void testDropTableWithDefaultPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + Identifier identifier = Identifier.create("test_db", "drop_default"); + jdbcCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + + Path tablePath = jdbcCatalog.getTableLocation(identifier); + assertThat(fileIO.exists(tablePath)).isTrue(); + + jdbcCatalog.dropTable(identifier, false); + + // Data SHOULD be deleted for default-path tables + assertThat(fileIO.exists(tablePath)).isFalse(); + } + + @Test + public void testRenameTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_rename/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier fromTable = Identifier.create("test_db", "rename_from"); + + jdbcCatalog.createTable(fromTable, schema, false); + + Identifier toTable = Identifier.create("test_db", "rename_to"); + jdbcCatalog.renameTable(fromTable, toTable, false); + + // Verify old table is gone + assertThatThrownBy(() -> jdbcCatalog.getTable(fromTable)) + .isInstanceOf(Catalog.TableNotExistException.class); + + // Verify new table is accessible and still points to the custom path + assertThat(jdbcCatalog.getTableLocation(toTable)).isEqualTo(new Path(customDir)); + + // Verify the path property was moved + Map storedProps = fetchTableProperties(jdbcCatalog, "test_db", "rename_to"); + assertThat(storedProps) + .containsEntry(CoreOptions.PATH.key(), new Path(customDir).toString()); + + // Verify old name has no properties + Map oldProps = fetchTableProperties(jdbcCatalog, "test_db", "rename_from"); + assertThat(oldProps).isEmpty(); + } + + @Test + public void testAlterTableWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_alter/my_table"; + Map options = new HashMap<>(); + options.put(CoreOptions.PATH.key(), customDir); + options.put("bucket", "4"); + Schema schema = + new Schema( + Lists.newArrayList( + new DataField(0, "pk", DataTypes.INT()), + new DataField(1, "col1", DataTypes.STRING())), + Collections.emptyList(), + Lists.newArrayList("pk"), + options, + ""); + + Identifier identifier = Identifier.create("test_db", "alter_custom"); + jdbcCatalog.createTable(identifier, schema, false); + + // Alter: add a new option + jdbcCatalog.alterTable( + identifier, + Lists.newArrayList(SchemaChange.setOption("file.format", "orc")), + false); + + // Verify path is preserved after alter + Map storedProps = + fetchTableProperties(jdbcCatalog, "test_db", "alter_custom"); + assertThat(storedProps) + .containsEntry(CoreOptions.PATH.key(), new Path(customDir).toString()); + assertThat(storedProps).containsEntry("file.format", "orc"); + assertThat(storedProps).containsEntry("bucket", "4"); + + // Verify getTableLocation still returns custom path + assertThat(jdbcCatalog.getTableLocation(identifier)).isEqualTo(new Path(customDir)); + } + + @Test + public void testLoadTableSchemaWithCustomPath() throws Exception { + JdbcCatalog jdbcCatalog = initCatalogWithSync(true); + jdbcCatalog.createDatabase("test_db", false); + + String customDir = warehouse + "/custom_load/my_table"; + Schema schema = schemaWithCustomPath(customDir); + Identifier identifier = Identifier.create("test_db", "load_custom"); + + jdbcCatalog.createTable(identifier, schema, false); + + // Verify schema loads from custom location + Table table = jdbcCatalog.getTable(identifier); + assertThat(table).isNotNull(); + assertThat(table.name()).isEqualTo("load_custom"); + } + + @Test + public void testGetTableLocationFallback() throws Exception { + JdbcCatalog jdbcCatalog = (JdbcCatalog) catalog; + jdbcCatalog.createDatabase("test_db", false); + + Identifier identifier = Identifier.create("test_db", "default_table"); + jdbcCatalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false); + + // Verify getTableLocation returns default when no stored path + Path expected = new Path(new Path(warehouse, "test_db.db"), "default_table"); + assertThat(jdbcCatalog.getTableLocation(identifier)).isEqualTo(expected); + } }