diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java new file mode 100644 index 000000000000..feeedbc7b558 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileKeyRangesTable.java @@ -0,0 +1,401 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.system; + +import org.apache.paimon.casting.CastExecutor; +import org.apache.paimon.casting.CastExecutors; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.LazyGenericRow; +import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.In; +import org.apache.paimon.predicate.LeafBinaryFunction; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LeafPredicateExtractor; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.ReadonlyTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.InnerTableScan; +import org.apache.paimon.table.source.ReadOnceTableScan; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.IteratorRecordReader; +import org.apache.paimon.utils.ProjectedRow; +import org.apache.paimon.utils.RowDataToObjectArrayConverter; +import org.apache.paimon.utils.SerializationUtils; +import org.apache.paimon.utils.TypeUtils; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER; + +/** + * A {@link Table} for showing key ranges and file path information for each data file, supporting + * diagnosis of data distribution and Global Index (PIP-41) coverage. + */ +public class FileKeyRangesTable implements ReadonlyTable { + + private static final long serialVersionUID = 1L; + + public static final String FILE_KEY_RANGES = "file_key_ranges"; + + public static final RowType TABLE_TYPE = + new RowType( + Arrays.asList( + new DataField(0, "partition", SerializationUtils.newStringType(true)), + new DataField(1, "bucket", new IntType(false)), + new DataField(2, "file_path", SerializationUtils.newStringType(false)), + new DataField( + 3, "file_format", SerializationUtils.newStringType(false)), + new DataField(4, "schema_id", new BigIntType(false)), + new DataField(5, "level", new IntType(false)), + new DataField(6, "record_count", new BigIntType(false)), + new DataField(7, "file_size_in_bytes", new BigIntType(false)), + new DataField(8, "min_key", SerializationUtils.newStringType(true)), + new DataField(9, "max_key", SerializationUtils.newStringType(true)), + new DataField(10, "first_row_id", new BigIntType(true)))); + + private final FileStoreTable storeTable; + + public FileKeyRangesTable(FileStoreTable storeTable) { + this.storeTable = storeTable; + } + + @Override + public String name() { + return storeTable.name() + SYSTEM_TABLE_SPLITTER + FILE_KEY_RANGES; + } + + @Override + public RowType rowType() { + return TABLE_TYPE; + } + + @Override + public List primaryKeys() { + return Collections.singletonList("file_path"); + } + + @Override + public FileIO fileIO() { + return storeTable.fileIO(); + } + + @Override + public InnerTableScan newScan() { + return new FileKeyRangesScan(storeTable); + } + + @Override + public InnerTableRead newRead() { + return new FileKeyRangesRead(storeTable.schemaManager(), storeTable); + } + + @Override + public Table copy(Map dynamicOptions) { + return new FileKeyRangesTable(storeTable.copy(dynamicOptions)); + } + + private static class FileKeyRangesScan extends ReadOnceTableScan { + + @Nullable private LeafPredicate partitionPredicate; + @Nullable private LeafPredicate bucketPredicate; + @Nullable private LeafPredicate levelPredicate; + + private final FileStoreTable fileStoreTable; + + public FileKeyRangesScan(FileStoreTable fileStoreTable) { + this.fileStoreTable = fileStoreTable; + } + + @Override + public InnerTableScan withFilter(Predicate pushdown) { + if (pushdown == null) { + return this; + } + + Map leafPredicates = + pushdown.visit(LeafPredicateExtractor.INSTANCE); + this.partitionPredicate = leafPredicates.get("partition"); + this.bucketPredicate = leafPredicates.get("bucket"); + this.levelPredicate = leafPredicates.get("level"); + return this; + } + + @Override + public Plan innerPlan() { + SnapshotReader snapshotReader = fileStoreTable.newSnapshotReader(); + if (partitionPredicate != null) { + List partitionKeys = fileStoreTable.partitionKeys(); + RowType partitionType = fileStoreTable.schema().logicalPartitionType(); + if (partitionPredicate.function() instanceof Equal) { + LinkedHashMap partSpec = + parsePartitionSpec( + partitionPredicate.literals().get(0).toString(), partitionKeys); + if (partSpec == null) { + return Collections::emptyList; + } + snapshotReader.withPartitionFilter(partSpec); + } else if (partitionPredicate.function() instanceof In) { + List orPredicates = new ArrayList<>(); + PredicateBuilder partBuilder = new PredicateBuilder(partitionType); + for (Object literal : partitionPredicate.literals()) { + LinkedHashMap partSpec = + parsePartitionSpec(literal.toString(), partitionKeys); + if (partSpec == null) { + continue; + } + List andPredicates = new ArrayList<>(); + for (int i = 0; i < partitionKeys.size(); i++) { + Object value = + TypeUtils.castFromString( + partSpec.get(partitionKeys.get(i)), + partitionType.getTypeAt(i)); + andPredicates.add(partBuilder.equal(i, value)); + } + orPredicates.add(PredicateBuilder.and(andPredicates)); + } + if (!orPredicates.isEmpty()) { + snapshotReader.withPartitionFilter(PredicateBuilder.or(orPredicates)); + } + } else if (partitionPredicate.function() instanceof LeafBinaryFunction) { + LinkedHashMap partSpec = + parsePartitionSpec( + partitionPredicate.literals().get(0).toString(), partitionKeys); + if (partSpec != null) { + PredicateBuilder partBuilder = new PredicateBuilder(partitionType); + List predicates = new ArrayList<>(); + for (int i = 0; i < partitionKeys.size(); i++) { + Object value = + TypeUtils.castFromString( + partSpec.get(partitionKeys.get(i)), + partitionType.getTypeAt(i)); + predicates.add( + new LeafPredicate( + partitionPredicate.function(), + partitionType.getTypeAt(i), + i, + partitionKeys.get(i), + Collections.singletonList(value))); + } + snapshotReader.withPartitionFilter(PredicateBuilder.and(predicates)); + } + } + } + + return () -> + snapshotReader.partitions().stream() + .map(p -> new FilesTable.FilesSplit(p, bucketPredicate, levelPredicate)) + .collect(Collectors.toList()); + } + + @Nullable + private LinkedHashMap parsePartitionSpec( + String partitionStr, List partitionKeys) { + if (partitionStr.startsWith("{")) { + partitionStr = partitionStr.substring(1); + } + if (partitionStr.endsWith("}")) { + partitionStr = partitionStr.substring(0, partitionStr.length() - 1); + } + String[] partFields = partitionStr.split(", "); + if (partitionKeys.size() != partFields.length) { + return null; + } + LinkedHashMap partSpec = new LinkedHashMap<>(); + for (int i = 0; i < partitionKeys.size(); i++) { + partSpec.put(partitionKeys.get(i), partFields[i]); + } + return partSpec; + } + } + + private static class FileKeyRangesRead implements InnerTableRead { + + private final SchemaManager schemaManager; + + private final FileStoreTable storeTable; + + private RowType readType; + + private FileKeyRangesRead(SchemaManager schemaManager, FileStoreTable fileStoreTable) { + this.schemaManager = schemaManager; + this.storeTable = fileStoreTable; + } + + @Override + public InnerTableRead withFilter(Predicate predicate) { + // TODO + return this; + } + + @Override + public InnerTableRead withReadType(RowType readType) { + this.readType = readType; + return this; + } + + @Override + public TableRead withIOManager(IOManager ioManager) { + return this; + } + + @Override + public RecordReader createReader(Split split) { + if (!(split instanceof FilesTable.FilesSplit)) { + throw new IllegalArgumentException("Unsupported split: " + split.getClass()); + } + FilesTable.FilesSplit filesSplit = (FilesTable.FilesSplit) split; + List splits = filesSplit.splits(storeTable); + if (splits.isEmpty()) { + return new IteratorRecordReader<>(Collections.emptyIterator()); + } + + @SuppressWarnings("unchecked") + CastExecutor partitionCastExecutor = + (CastExecutor) + CastExecutors.resolveToString( + storeTable.schema().logicalPartitionType()); + + Function keyConverters = + new Function() { + final Map keyConverterMap = + new HashMap<>(); + + @Override + public RowDataToObjectArrayConverter apply(Long schemaId) { + return keyConverterMap.computeIfAbsent( + schemaId, + k -> { + TableSchema dataSchema = schemaManager.schema(schemaId); + RowType keysType = + dataSchema.logicalTrimmedPrimaryKeysType(); + return keysType.getFieldCount() > 0 + ? new RowDataToObjectArrayConverter( + dataSchema.logicalTrimmedPrimaryKeysType()) + : new RowDataToObjectArrayConverter( + dataSchema.logicalRowType()); + }); + } + }; + + List> iteratorList = new ArrayList<>(); + for (Split dataSplit : splits) { + iteratorList.add( + Iterators.transform( + ((DataSplit) dataSplit).dataFiles().iterator(), + file -> + toRow( + (DataSplit) dataSplit, + partitionCastExecutor, + keyConverters, + file))); + } + Iterator rows = Iterators.concat(iteratorList.iterator()); + if (readType != null) { + rows = + Iterators.transform( + rows, + row -> + ProjectedRow.from(readType, FileKeyRangesTable.TABLE_TYPE) + .replaceRow(row)); + } + return new IteratorRecordReader<>(rows); + } + + private LazyGenericRow toRow( + DataSplit dataSplit, + CastExecutor partitionCastExecutor, + Function keyConverters, + DataFileMeta file) { + @SuppressWarnings("unchecked") + Supplier[] fields = + new Supplier[] { + () -> + dataSplit.partition() == null + ? null + : partitionCastExecutor.cast(dataSplit.partition()), + dataSplit::bucket, + () -> + BinaryString.fromString( + file.externalPath() + .orElse( + dataSplit.bucketPath() + + "/" + + file.fileName())), + () -> + BinaryString.fromString( + DataFilePathFactory.formatIdentifier(file.fileName())), + file::schemaId, + file::level, + file::rowCount, + file::fileSize, + () -> + file.minKey().getFieldCount() <= 0 + ? null + : BinaryString.fromString( + Arrays.toString( + keyConverters + .apply(file.schemaId()) + .convert(file.minKey()))), + () -> + file.maxKey().getFieldCount() <= 0 + ? null + : BinaryString.fromString( + Arrays.toString( + keyConverters + .apply(file.schemaId()) + .convert(file.maxKey()))), + file::firstRowId + }; + + return new LazyGenericRow(fields); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index fcfaafaea5eb..e0a89d92bfba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -282,13 +282,13 @@ private LinkedHashMap parsePartitionSpec( } } - private static class FilesSplit extends SingletonSplit { + static class FilesSplit extends SingletonSplit { @Nullable private final BinaryRow partition; @Nullable private final LeafPredicate bucketPredicate; @Nullable private final LeafPredicate levelPredicate; - private FilesSplit( + FilesSplit( @Nullable BinaryRow partition, @Nullable LeafPredicate bucketPredicate, @Nullable LeafPredicate levelPredicate) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java index e1411d290f35..93e0453ab744 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SystemTableLoader.java @@ -42,6 +42,7 @@ import static org.apache.paimon.table.system.BucketsTable.BUCKETS; import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS; import static org.apache.paimon.table.system.ConsumersTable.CONSUMERS; +import static org.apache.paimon.table.system.FileKeyRangesTable.FILE_KEY_RANGES; import static org.apache.paimon.table.system.FilesTable.FILES; import static org.apache.paimon.table.system.ManifestsTable.MANIFESTS; import static org.apache.paimon.table.system.OptionsTable.OPTIONS; @@ -67,6 +68,7 @@ public class SystemTableLoader { .put(BUCKETS, BucketsTable::new) .put(AUDIT_LOG, AuditLogTable::new) .put(FILES, FilesTable::new) + .put(FILE_KEY_RANGES, FileKeyRangesTable::new) .put(TAGS, TagsTable::new) .put(BRANCHES, BranchesTable::new) .put(CONSUMERS, ConsumersTable::new) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java new file mode 100644 index 000000000000..ce675e2aaefa --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/system/FileKeyRangesTableTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.system; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.predicate.In; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.source.ReadBuilder; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.paimon.catalog.Identifier.SYSTEM_TABLE_SPLITTER; +import static org.apache.paimon.io.DataFileTestUtils.row; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FileKeyRangesTable}. */ +public class FileKeyRangesTableTest extends TableTestBase { + + private static final String TABLE_NAME = "MyTable"; + + private FileStoreTable table; + private FileKeyRangesTable fileKeyRangesTable; + + @BeforeEach + public void before() throws Exception { + Schema schema = + Schema.newBuilder() + .column("pk", DataTypes.INT()) + .column("pt", DataTypes.INT()) + .column("col", DataTypes.INT()) + .partitionKeys("pt") + .primaryKey("pk", "pt") + .option("bucket", "2") + .build(); + catalog.createTable(identifier(TABLE_NAME), schema, false); + table = (FileStoreTable) catalog.getTable(identifier(TABLE_NAME)); + + Identifier fileKeyRangesId = + identifier(TABLE_NAME + SYSTEM_TABLE_SPLITTER + FileKeyRangesTable.FILE_KEY_RANGES); + fileKeyRangesTable = (FileKeyRangesTable) catalog.getTable(fileKeyRangesId); + + // snapshot 1: write two partitions + write(table, GenericRow.of(1, 1, 10), GenericRow.of(1, 2, 20)); + // snapshot 2: write more rows + write(table, GenericRow.of(2, 1, 30), GenericRow.of(2, 2, 40)); + } + + @Test + public void testReadBasic() throws Exception { + List rows = readPartBucketLevel(null); + assertThat(rows).isNotEmpty(); + + // verify that file_path, min_key and max_key fields are readable + ReadBuilder rb = fileKeyRangesTable.newReadBuilder(); + rb.newRead() + .createReader(rb.newScan().plan()) + .forEachRemaining( + row -> { + // file_path (index 2) should be non-null + assertThat(row.getString(2)).isNotNull(); + // min_key (index 8) and max_key (index 9) should be non-null for + // primary key tables + assertThat(row.getString(8)).isNotNull(); + assertThat(row.getString(9)).isNotNull(); + }); + } + + @Test + public void testPartitionFilter() throws Exception { + PredicateBuilder builder = new PredicateBuilder(FileKeyRangesTable.TABLE_TYPE); + List rows = readPartBucketLevel(builder.equal(0, BinaryString.fromString("{1}"))); + assertThat(rows).isNotEmpty(); + for (String row : rows) { + assertThat(row).startsWith("{1}-"); + } + } + + @Test + public void testBucketFilter() throws Exception { + PredicateBuilder builder = new PredicateBuilder(FileKeyRangesTable.TABLE_TYPE); + List rows = readPartBucketLevel(builder.equal(1, 0)); + assertThat(rows).isNotEmpty(); + for (String row : rows) { + String[] parts = row.split("-"); + assertThat(parts[1]).isEqualTo("0"); + } + } + + @Test + public void testLevelFilter() throws Exception { + // compact to produce level-5 files + compact(table, row(1), 0); + compact(table, row(2), 0); + + PredicateBuilder builder = new PredicateBuilder(FileKeyRangesTable.TABLE_TYPE); + List rows = readPartBucketLevel(builder.equal(5, 5)); + assertThat(rows).isNotEmpty(); + for (String row : rows) { + String[] parts = row.split("-"); + assertThat(parts[2]).isEqualTo("5"); + } + } + + @Test + public void testFirstRowId() throws Exception { + // first_row_id (index 10) is nullable BigInt; for a primary-key table without + // first_row_id configured it will be null - just verify the field is accessible + ReadBuilder rb = fileKeyRangesTable.newReadBuilder(); + rb.newRead() + .createReader(rb.newScan().plan()) + .forEachRemaining( + row -> { + // accessing index 10 should not throw even if null + if (!row.isNullAt(10)) { + assertThat(row.getLong(10)).isGreaterThanOrEqualTo(0L); + } + }); + } + + @Test + public void testSystemTableName() throws Exception { + Identifier id = + identifier(TABLE_NAME + SYSTEM_TABLE_SPLITTER + FileKeyRangesTable.FILE_KEY_RANGES); + FileKeyRangesTable t = (FileKeyRangesTable) catalog.getTable(id); + assertThat(t.rowType().getFieldCount()).isEqualTo(11); + } + + @Test + public void testPartitionInFilter() throws Exception { + DataField partitionField = FileKeyRangesTable.TABLE_TYPE.getFields().get(0); + Predicate inPredicate = + new LeafPredicate( + In.INSTANCE, + partitionField.type(), + 0, + partitionField.name(), + Arrays.asList( + BinaryString.fromString("{1}"), BinaryString.fromString("{2}"))); + List rows = readPartBucketLevel(inPredicate); + assertThat(rows).isNotEmpty(); + boolean hasPt1 = false; + boolean hasPt2 = false; + for (String row : rows) { + if (row.startsWith("{1}-")) { + hasPt1 = true; + } + if (row.startsWith("{2}-")) { + hasPt2 = true; + } + } + assertThat(hasPt1).isTrue(); + assertThat(hasPt2).isTrue(); + } + + private List readPartBucketLevel(Predicate predicate) throws IOException { + ReadBuilder rb = fileKeyRangesTable.newReadBuilder(); + if (predicate != null) { + rb = rb.withFilter(predicate); + } + List rows = new ArrayList<>(); + rb.newRead() + .createReader(rb.newScan().plan()) + .forEachRemaining( + row -> + rows.add( + row.getString(0) + + "-" + + row.getInt(1) + + "-" + + row.getInt(5))); + return rows; + } +}