Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 132 additions & 19 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ protected List<String> listTablesImpl(String databaseName) {
@Override
protected void dropTableImpl(Identifier identifier, List<Path> externalPaths) {
try {
// Retrieve table location and custom-path flag BEFORE deleting JDBC metadata
Optional<String> storedPath = fetchStoredPathIfSyncEnabled(identifier);
Path path = storedPath.map(Path::new).orElse(super.getTableLocation(identifier));
boolean customPath = storedPath.isPresent();

int deletedRecords =
execute(
connections,
Expand All @@ -313,7 +318,12 @@ protected void dropTableImpl(Identifier identifier, List<Path> externalPaths) {
identifier.getDatabaseName(),
identifier.getTableName());
}
Path path = getTableLocation(identifier);

// If custom path: skip filesystem deletion (external table keeps its data)
if (customPath) {
return;
}

try {
if (fileIO.exists(path)) {
fileIO.deleteDirectoryQuietly(path);
Expand All @@ -334,12 +344,17 @@ protected void dropTableImpl(Identifier identifier, List<Path> externalPaths) {
@Override
protected void createTableImpl(Identifier identifier, Schema schema) {
try {
// create table file
SchemaManager schemaManager = getSchemaManager(identifier);
// Determine table location before table exists in JDBC
Path tableLocation = initialTableLocation(schema.options(), identifier);
boolean externalTable =
syncTableProperties() && schema.options().containsKey(CoreOptions.PATH.key());

// create table file using the determined location
SchemaManager schemaManager = new SchemaManager(fileIO, tableLocation);
TableSchema tableSchema =
runWithLock(identifier, () -> schemaManager.createTable(schema));
// Update schema metadata
Path path = getTableLocation(identifier);
runWithLock(identifier, () -> schemaManager.createTable(schema, externalTable));

// Insert table record into paimon_tables
if (JdbcUtils.insertTable(
connections,
catalogKey,
Expand All @@ -348,22 +363,36 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
LOG.debug("Successfully committed to new table: {}", identifier);
} else {
try {
fileIO.deleteDirectoryQuietly(path);
if (!externalTable) {
fileIO.deleteDirectoryQuietly(tableLocation);
}
} catch (Exception ee) {
LOG.error("Delete directory[{}] fail for table {}", path, identifier, ee);
LOG.error(
"Delete directory[{}] fail for table {}",
tableLocation,
identifier,
ee);
}
throw new RuntimeException(
String.format(
"Failed to create table %s in catalog %s",
identifier.getFullName(), catalogKey));
}

if (syncTableProperties()) {
Map<String, String> propsToStore = collectTableProperties(tableSchema);
// Store custom path in table properties
if (externalTable) {
propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString());
} else {
propsToStore.remove(CoreOptions.PATH.key());
}
JdbcUtils.insertTableProperties(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
collectTableProperties(tableSchema));
propsToStore);
}
} catch (Exception e) {
throw new RuntimeException("Failed to create table " + identifier.getFullName(), e);
Expand All @@ -373,6 +402,11 @@ protected void createTableImpl(Identifier identifier, Schema schema) {
@Override
protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
try {
// Check custom path BEFORE renaming metadata
Optional<String> storedPath = fetchStoredPathIfSyncEnabled(fromTable);
boolean customPath = storedPath.isPresent();
Path fromPath = storedPath.map(Path::new).orElse(super.getTableLocation(fromTable));

// update table metadata info
updateTable(connections, catalogKey, fromTable, toTable);
if (syncTableProperties()) {
Expand All @@ -386,11 +420,15 @@ protected void renameTableImpl(Identifier fromTable, Identifier toTable) {
fromTable.getObjectName());
}

Path fromPath = getTableLocation(fromTable);
// If custom path: skip filesystem rename (data stays at same location)
if (customPath) {
return;
}

if (!new SchemaManager(fileIO, fromPath).listAllIds().isEmpty()) {
// Rename the file system's table directory. Maintain consistency between tables in
// the file system and tables in the Hive Metastore.
Path toPath = getTableLocation(toTable);
// the file system and tables in the JDBC catalog.
Path toPath = super.getTableLocation(toTable);
try {
fileIO.rename(fromPath, toPath);
} catch (IOException e) {
Expand All @@ -414,19 +452,34 @@ protected void alterTableImpl(Identifier identifier, List<SchemaChange> changes)
try {
runWithLock(identifier, () -> schemaManager.commitChanges(changes));
if (syncTableProperties()) {
// Save custom path before DELETE_ALL so we can re-insert it
Optional<String> customPath =
JdbcUtils.getTableProperty(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
CoreOptions.PATH.key());

TableSchema updatedSchema = schemaManager.latest().get();
execute(
connections,
JdbcUtils.DELETE_ALL_TABLE_PROPERTIES_SQL,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName());
JdbcUtils.insertTableProperties(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
collectTableProperties(updatedSchema));

Map<String, String> propsToStore = collectTableProperties(updatedSchema);
// Re-insert custom path if it was stored
customPath.ifPresent(p -> propsToStore.put(CoreOptions.PATH.key(), p));
if (!propsToStore.isEmpty()) {
JdbcUtils.insertTableProperties(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
propsToStore);
}
}
} catch (TableNotExistException
| ColumnAlreadyExistException
Expand All @@ -451,6 +504,28 @@ protected TableSchema loadTableSchema(Identifier identifier) throws TableNotExis
() -> new RuntimeException("There is no paimon table in " + tableLocation));
}

@Override
protected boolean allowCustomTablePath() {
return syncTableProperties();
}

@Override
public Path getTableLocation(Identifier identifier) {
if (syncTableProperties()) {
Optional<String> storedPath =
JdbcUtils.getTableProperty(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
CoreOptions.PATH.key());
if (storedPath.isPresent()) {
return new Path(storedPath.get());
}
}
return super.getTableLocation(identifier);
}

@Override
public boolean caseSensitive() {
return false;
Expand Down Expand Up @@ -522,6 +597,15 @@ public void repairDatabase(String databaseName) {
}
}

/**
* Repair a table by re-syncing JDBC metadata from the filesystem schema.
*
* <p>Note: Tables created with a custom path ({@code CoreOptions.PATH}) cannot be fully
* repaired if both the {@code paimon_tables} row and the {@code paimon_table_properties} rows
* are lost. In that case, {@code getTableLocation} falls back to the default warehouse path
* where no schema exists, and repair will throw {@link TableNotExistException}. To recover,
* re-create the table pointing to the original custom path.
*/
@Override
public void repairTable(Identifier identifier) throws TableNotExistException {
checkNotBranch(identifier, "repairTable");
Expand Down Expand Up @@ -550,6 +634,7 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
LOG.error("Failed to repair table: {}", identifier);
}
}

if (syncTableProperties()) {
// Delete existing properties and reinsert from filesystem schema
execute(
Expand All @@ -558,12 +643,21 @@ public void repairTable(Identifier identifier) throws TableNotExistException {
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName());

Map<String, String> propsToStore = collectTableProperties(tableSchema);
// Check if the table has a custom path (path from schema != default location)
Path defaultLocation = super.getTableLocation(identifier);
if (!tableLocation.equals(defaultLocation)) {
propsToStore.put(CoreOptions.PATH.key(), tableLocation.toString());
} else {
propsToStore.remove(CoreOptions.PATH.key());
}
JdbcUtils.insertTableProperties(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
collectTableProperties(tableSchema));
propsToStore);
}
}

Expand Down Expand Up @@ -599,6 +693,25 @@ private Map<String, String> collectTableProperties(TableSchema tableSchema) {
return properties;
}

private Path initialTableLocation(Map<String, String> tableOptions, Identifier identifier) {
if (tableOptions.containsKey(CoreOptions.PATH.key())) {
return new Path(tableOptions.get(CoreOptions.PATH.key()));
}
return super.getTableLocation(identifier);
}

private Optional<String> fetchStoredPathIfSyncEnabled(Identifier identifier) {
if (syncTableProperties()) {
return JdbcUtils.getTableProperty(
connections,
catalogKey,
identifier.getDatabaseName(),
identifier.getTableName(),
CoreOptions.PATH.key());
}
return Optional.empty();
}

private SchemaManager getSchemaManager(Identifier identifier) {
return new SchemaManager(fileIO, getTableLocation(identifier));
}
Expand Down
51 changes: 51 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
Expand Down Expand Up @@ -325,6 +326,21 @@ public class JdbcUtils {
+ TABLE_NAME
+ " = ? ";

static final String GET_TABLE_PROPERTY_SQL =
"SELECT "
+ TABLE_PROPERTY_VALUE
+ " FROM "
+ TABLE_PROPERTIES_TABLE_NAME
+ " WHERE "
+ CATALOG_KEY
+ " = ? AND "
+ TABLE_DATABASE
+ " = ? AND "
+ TABLE_NAME
+ " = ? AND "
+ TABLE_PROPERTY_KEY
+ " = ?";

// Distributed locks table
static final String DISTRIBUTED_LOCKS_TABLE_NAME = "paimon_distributed_locks";
static final String LOCK_ID = "lock_id";
Expand Down Expand Up @@ -554,6 +570,41 @@ public static boolean insertTableProperties(
insertedRecords, properties.size()));
}

@SuppressWarnings("checkstyle:NestedTryDepth")
public static Optional<String> getTableProperty(
JdbcClientPool connections,
String storeKey,
String databaseName,
String tableName,
String propertyKey) {
try {
return connections.run(
conn -> {
try (PreparedStatement ps = conn.prepareStatement(GET_TABLE_PROPERTY_SQL)) {
ps.setString(1, storeKey);
ps.setString(2, databaseName);
ps.setString(3, tableName);
ps.setString(4, propertyKey);
try (ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
return Optional.of(rs.getString(TABLE_PROPERTY_VALUE));
}
}
}
return Optional.<String>empty();
});
} catch (SQLException e) {
throw new RuntimeException(
String.format(
"Failed to get table property '%s' for %s.%s",
propertyKey, databaseName, tableName),
e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted in SQL query", e);
}
}

private static String insertTablePropertiesStatement(int size) {
StringBuilder sqlStatement = new StringBuilder(JdbcUtils.INSERT_TABLE_PROPERTIES_SQL);
for (int i = 0; i < size; i++) {
Expand Down
Loading