Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,21 @@ public class CatalogContext implements Serializable {
private final SerializableConfiguration hadoopConf;
@Nullable private final FileIOLoader preferIOLoader;
@Nullable private final FileIOLoader fallbackIOLoader;
@Nullable private final String catalogName;

private CatalogContext(
Options options,
@Nullable Configuration hadoopConf,
@Nullable FileIOLoader preferIOLoader,
@Nullable FileIOLoader fallbackIOLoader) {
@Nullable FileIOLoader fallbackIOLoader,
@Nullable String catalogName) {
this.options = checkNotNull(options);
this.hadoopConf =
new SerializableConfiguration(
hadoopConf == null ? getHadoopConfiguration(options) : hadoopConf);
this.preferIOLoader = preferIOLoader;
this.fallbackIOLoader = fallbackIOLoader;
this.catalogName = catalogName;
}

public static CatalogContext create(Path warehouse) {
Expand All @@ -69,28 +72,43 @@ public static CatalogContext create(Path warehouse) {
}

public static CatalogContext create(Options options) {
return new CatalogContext(options, null, null, null);
return new CatalogContext(options, null, null, null, null);
}

public static CatalogContext create(Options options, Configuration hadoopConf) {
return new CatalogContext(options, hadoopConf, null, null);
return new CatalogContext(options, hadoopConf, null, null, null);
}

public static CatalogContext create(
Options options, Configuration hadoopConf, @Nullable String catalogName) {
return new CatalogContext(options, hadoopConf, null, null, catalogName);
}

public static CatalogContext create(Options options, FileIOLoader fallbackIOLoader) {
return new CatalogContext(options, null, null, fallbackIOLoader);
return new CatalogContext(options, null, null, fallbackIOLoader, null);
}

public static CatalogContext create(
Options options, FileIOLoader preferIOLoader, FileIOLoader fallbackIOLoader) {
return new CatalogContext(options, null, preferIOLoader, fallbackIOLoader);
return new CatalogContext(options, null, preferIOLoader, fallbackIOLoader, null);
}

public static CatalogContext create(
Options options,
FileIOLoader preferIOLoader,
FileIOLoader fallbackIOLoader,
@Nullable String catalogName) {
return new CatalogContext(options, null, preferIOLoader, fallbackIOLoader, catalogName);
}

public static CatalogContext create(
Options options,
Configuration hadoopConf,
FileIOLoader preferIOLoader,
FileIOLoader fallbackIOLoader) {
return new CatalogContext(options, hadoopConf, preferIOLoader, fallbackIOLoader);
FileIOLoader fallbackIOLoader,
@Nullable String catalogName) {
return new CatalogContext(
options, hadoopConf, preferIOLoader, fallbackIOLoader, catalogName);
}

public Options options() {
Expand All @@ -111,4 +129,9 @@ public FileIOLoader preferIO() {
public FileIOLoader fallbackIO() {
return fallbackIOLoader;
}

@Nullable
public String catalogName() {
return catalogName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public void configure(CatalogContext context) {
options.set(RESOLVING_FILE_IO_ENABLED, false);
this.context =
CatalogContext.create(
options, context.hadoopConf(), context.preferIO(), context.fallbackIO());
options,
context.hadoopConf(),
context.preferIO(),
context.fallbackIO(),
context.catalogName());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ public FileIO fileIO() throws IOException {
options,
catalogContext.hadoopConf(),
catalogContext.preferIO(),
catalogContext.fallbackIO());
catalogContext.fallbackIO(),
catalogContext.catalogName());
try {
fileIO = FileIO.get(path, context);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ protected AbstractCatalog(FileIO fileIO, CatalogContext context) {
this.context = context;
}

@Nullable
@Override
public String name() {
return context.catalogName();
}

@Override
public Map<String, String> options() {
return context.options().toMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,10 @@ TableQueryAuthResult authTableQuery(Identifier identifier, @Nullable List<String

// ==================== Catalog Information ==========================

/** The name of this catalog. */
@Nullable
String name();

/** Catalog options for re-creating this catalog. */
Map<String, String> options();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.DataField;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -292,6 +291,7 @@ public static Table loadTable(

CatalogEnvironment catalogEnv =
new CatalogEnvironment(
catalog.name(),
tableIdentifier,
metadata.uuid(),
isRestCatalog && metadata.isExternal() ? null : catalog.catalogLoader(),
Expand All @@ -315,15 +315,17 @@ private static Table createGlobalSystemTable(String tableName, Catalog catalog)
throws Catalog.TableNotExistException {
switch (tableName.toLowerCase()) {
case ALL_TABLE_OPTIONS:
List<Table> tables = listAllTables(catalog);
Map<Identifier, Table> tables = listAllTables(catalog);
Map<Identifier, Map<String, String>> allOptions = new HashMap<>();
for (Table table : tables) {
allOptions.put(Identifier.fromString(table.fullName()), table.options());
for (Map.Entry<Identifier, Table> entry : tables.entrySet()) {
allOptions.put(entry.getKey(), entry.getValue().options());
}
return new AllTableOptionsTable(allOptions);
case ALL_TABLES:
return AllTablesTable.fromTables(
toTableAndSnapshots(catalog, listAllTables(catalog)));
Map<Identifier, Table> allTablesForDisplay = listAllTables(catalog);
Map<Identifier, TableSnapshot> snapshots =
toTableSnapshots(catalog, allTablesForDisplay);
return AllTablesTable.fromTables(allTablesForDisplay, snapshots);
case ALL_PARTITIONS:
return AllPartitionsTable.fromPartitions(
toAllPartitions(catalog, listAllTables(catalog)));
Expand All @@ -335,50 +337,49 @@ private static Table createGlobalSystemTable(String tableName, Catalog catalog)
}
}

private static List<Table> listAllTables(Catalog catalog) {
List<Table> tables = new ArrayList<>();
private static Map<Identifier, Table> listAllTables(Catalog catalog) {
Map<Identifier, Table> tables = new HashMap<>();
for (String database : catalog.listDatabases()) {
try {
for (String name : catalog.listTables(database)) {
tables.add(catalog.getTable(Identifier.create(database, name)));
Identifier identifier = Identifier.create(database, name);
tables.put(identifier, catalog.getTable(identifier));
}
} catch (Catalog.DatabaseNotExistException | Catalog.TableNotExistException ignored) {
}
}
return tables;
}

private static List<Pair<Table, TableSnapshot>> toTableAndSnapshots(
Catalog catalog, List<Table> tables) {
List<Pair<Table, TableSnapshot>> tableAndSnapshots = new ArrayList<>();
for (Table table : tables) {
TableSnapshot snapshot = null;
private static Map<Identifier, TableSnapshot> toTableSnapshots(
Catalog catalog, Map<Identifier, Table> tables) {
Map<Identifier, TableSnapshot> snapshots = new HashMap<>();
for (Identifier identifier : tables.keySet()) {
if (catalog.supportsVersionManagement()) {
try {
Optional<TableSnapshot> optional =
catalog.loadSnapshot(Identifier.fromString(table.fullName()));
Optional<TableSnapshot> optional = catalog.loadSnapshot(identifier);
if (optional.isPresent()) {
snapshot = optional.get();
snapshots.put(identifier, optional.get());
}
} catch (Catalog.TableNotExistException ignored) {
} catch (NotImplementedException ignored) {
// does not support supportsVersionManagement for external paimon table
}
}
tableAndSnapshots.add(Pair.of(table, snapshot));
}
return tableAndSnapshots;
return snapshots;
}

private static Map<Identifier, List<Partition>> toAllPartitions(
Catalog catalog, List<Table> tables) {
Catalog catalog, Map<Identifier, Table> tables) {
Map<Identifier, List<Partition>> allPartitions = new HashMap<>();
for (Table table : tables) {
for (Map.Entry<Identifier, Table> entry : tables.entrySet()) {
Table table = entry.getValue();
if (table.partitionKeys().isEmpty()) {
continue;
}

Identifier identifier = Identifier.fromString(table.fullName());
Identifier identifier = entry.getKey();
try {
List<Partition> partitions = catalog.listPartitions(identifier);
allPartitions.put(identifier, partitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public Catalog wrapped() {
return wrapped;
}

@Nullable
@Override
public String name() {
return wrapped.name();
}

@Override
public boolean caseSensitive() {
return wrapped.caseSensitive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ public void open(FileStoreTable fileStoreTable, CoreOptions options) {

this.params = options.httpReportMarkDoneActionParams();
this.url = options.httpReportMarkDoneActionUrl();
this.tableName = fileStoreTable.fullName();
// just for compatibility with the old behavior
String fullName = fileStoreTable.fullName();
String[] parts = fullName.split("\\.");
this.tableName = parts.length == 3 ? parts[1] + "." + parts[2] : fullName;
this.location = fileStoreTable.location().toString();

this.mapper = new ObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,17 @@ public RESTCatalog(CatalogContext context, boolean configRequired) {
api.options(),
context.hadoopConf(),
context.preferIO(),
context.fallbackIO());
context.fallbackIO(),
context.catalogName());
this.dataTokenEnabled = api.options().get(RESTTokenFileIO.DATA_TOKEN_ENABLED);
this.tableDefaultOptions = CatalogUtils.tableDefaultOptions(this.context.options().toMap());
}

@Override
public String name() {
return context.catalogName();
}

@Override
public Map<String, String> options() {
return context.options().toMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,12 @@ public String name() {

@Override
public String fullName() {
return identifier().getFullName();
String catalogName = catalogEnvironment.catalogName();
String identifierFullName = identifier().getFullName();
if (catalogName != null) {
return catalogName + "." + identifierFullName;
}
return identifierFullName;
}

public Identifier identifier() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@
/** Catalog environment in table which contains log factory, metastore client factory. */
public class CatalogEnvironment implements Serializable {

private static final long serialVersionUID = 2L;
private static final long serialVersionUID = 3L;

@Nullable private final String catalogName;
@Nullable private final Identifier identifier;
@Nullable private final String uuid;
@Nullable private final CatalogLoader catalogLoader;
Expand All @@ -55,6 +56,7 @@ public class CatalogEnvironment implements Serializable {
private final boolean supportsPartitionModification;

public CatalogEnvironment(
@Nullable String catalogName,
@Nullable Identifier identifier,
@Nullable String uuid,
@Nullable CatalogLoader catalogLoader,
Expand All @@ -63,6 +65,7 @@ public CatalogEnvironment(
@Nullable CatalogContext catalogContext,
boolean supportsVersionManagement,
boolean supportsPartitionModification) {
this.catalogName = catalogName;
this.identifier = identifier;
this.uuid = uuid;
this.catalogLoader = catalogLoader;
Expand All @@ -74,7 +77,12 @@ public CatalogEnvironment(
}

public static CatalogEnvironment empty() {
return new CatalogEnvironment(null, null, null, null, null, null, false, false);
return new CatalogEnvironment(null, null, null, null, null, null, null, false, false);
}

@Nullable
public String catalogName() {
return catalogName;
}

@Nullable
Expand Down Expand Up @@ -173,6 +181,7 @@ public CatalogContext catalogContext() {

public CatalogEnvironment copy(Identifier identifier) {
return new CatalogEnvironment(
catalogName,
identifier,
uuid,
catalogLoader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@
@Public
public interface FormatTable extends Table {

/** The identifier of this table. */
Identifier identifier();

/** Directory location in file system. */
String location();

Expand Down Expand Up @@ -206,6 +209,11 @@ public FormatTableImpl(
this.catalogContext = catalogContext;
}

@Override
public Identifier identifier() {
return identifier;
}

@Override
public String name() {
return identifier.getTableName();
Expand Down
5 changes: 4 additions & 1 deletion paimon-core/src/main/java/org/apache/paimon/table/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ public interface Table extends Serializable {
/** A name to identify this table. */
String name();

/** Full name of the table, default is database.tableName. */
/**
* Full name of the table, default is database.tableName or catalog.database.tableName if
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a bit of a big change. It's not compatible and some systems already rely on it. Can we just modify the Spark side and pass the catalogName?

* catalog is present.
*/
default String fullName() {
return name();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.paimon.table.format;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
Expand Down Expand Up @@ -84,7 +83,7 @@ public BatchTableCommit newCommit() {
table.fileIO(),
formatTablePartitionOnlyValueInPath,
overwrite,
Identifier.fromString(table.fullName()),
table.identifier(),
staticPartition,
syncHiveUri,
table.catalogContext());
Expand Down
Loading