From 83cd37a8302b28e9bcc3f88a562753af07d3838a Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Wed, 11 Mar 2026 20:47:02 +0800 Subject: [PATCH 1/3] [core] Fix incorrect sequenceNumber in manifest after row-tracking compaction --- .../java/org/apache/paimon/append/AppendCompactTask.java | 2 ++ .../org/apache/paimon/spark/sql/RowTrackingTestBase.scala | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java index 77d52b7c7d70..af3e7785beab 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java @@ -35,6 +35,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Objects; @@ -49,6 +50,7 @@ public class AppendCompactTask { public AppendCompactTask(BinaryRow partition, List files) { Preconditions.checkArgument(files != null); + files.sort(Comparator.comparingLong(DataFileMeta::minSequenceNumber)); this.partition = partition; compactBefore = new ArrayList<>(files); compactAfter = new ArrayList<>(); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index 0ea9d21bcf2a..fc174ba7b8ef 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -252,6 +252,14 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), Seq(Row(1, 1, 0, 1), Row(2, 2, 1, 2), Row(3, 3, 2, 3)) ) + + sql("INSERT INTO t VALUES (4, '4')") + sql("INSERT INTO t VALUES (5, '5')") + sql("CALL sys.compact(table => 't')") + checkAnswer( + sql("SELECT min_sequence_number, max_sequence_number FROM `t$files`"), + Seq(Row(1, 5)) + ) } } From 3698596cb53ab6421849a0266dee2a068fc21202 Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Thu, 12 Mar 2026 20:45:28 +0800 Subject: [PATCH 2/3] [core] Fix in CompactProcedure --- .../java/org/apache/paimon/append/AppendCompactTask.java | 2 -- .../apache/paimon/spark/procedure/CompactProcedure.java | 8 ++++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java index af3e7785beab..77d52b7c7d70 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactTask.java @@ -35,7 +35,6 @@ import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Objects; @@ -50,7 +49,6 @@ public class AppendCompactTask { public AppendCompactTask(BinaryRow partition, List files) { Preconditions.checkArgument(files != null); - files.sort(Comparator.comparingLong(DataFileMeta::minSequenceNumber)); this.partition = partition; compactBefore = new ArrayList<>(files); compactAfter = new ArrayList<>(); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 0fe8a76b86a8..109d7d4e4734 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -87,6 +87,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -459,6 +460,13 @@ private void compactUnAwareBucketTable( ser.deserialize( ser.getVersion(), taskIterator.next()); + if (coreOptions.rowTrackingEnabled()) { + task.compactBefore() + .sort( + Comparator.comparingLong( + DataFileMeta + ::minSequenceNumber)); + } messages.add( messageSer.serialize( task.doCompact(table, write))); From 0e957d82db6e3dd1ca9ca90af9501d116996a966 Mon Sep 17 00:00:00 2001 From: Juntao Zhang Date: Sat, 14 Mar 2026 12:12:50 +0800 Subject: [PATCH 3/3] [core] Fix in RowDataFileWriter --- .../apache/paimon/io/RowDataFileWriter.java | 48 +++++++++++++++++-- .../commit/RowTrackingCommitUtils.java | 13 ++++- .../spark/procedure/CompactProcedure.java | 8 ---- .../spark/sql/RowTrackingTestBase.scala | 25 +++++++++- 4 files changed, 81 insertions(+), 13 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 7f8715ab0846..6b06258c6fbb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.stats.SimpleStatsConverter; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.LongCounter; import org.apache.paimon.utils.Pair; @@ -52,6 +53,10 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter writeCols; + private final int seqNumberFieldIndex; + private long minSeqNumber; + private long maxSeqNumber; + private boolean hasNullSeqNumber; public RowDataFileWriter( FileIO fileIO, @@ -76,6 +81,10 @@ public RowDataFileWriter( fileIO, dataFileToFileIndexPath(path), writeSchema, fileIndexOptions); this.fileSource = fileSource; this.writeCols = writeCols; + this.seqNumberFieldIndex = writeSchema.getFieldIndex(SpecialFields.SEQUENCE_NUMBER.name()); + this.minSeqNumber = Long.MAX_VALUE; + this.maxSeqNumber = Long.MIN_VALUE; + this.hasNullSeqNumber = false; } @Override @@ -85,7 +94,7 @@ public void write(InternalRow row) throws IOException { if (dataFileIndexWriter != null) { dataFileIndexWriter.write(row); } - seqNumCounter.add(1L); + updateSeqNumber(row); } @Override @@ -111,8 +120,8 @@ public DataFileMeta result() throws IOException { fileSize, recordCount(), statsPair.getRight(), - seqNumCounter.getValue() - super.recordCount(), - seqNumCounter.getValue() - 1, + minSeqNumber(), + maxSeqNumber(), schemaId, indexResult.independentIndexFile() == null ? Collections.emptyList() @@ -124,4 +133,37 @@ public DataFileMeta result() throws IOException { null, writeCols); } + + private long minSeqNumber() { + if (seqNumberFieldIndex == -1) { + return seqNumCounter.getValue() - super.recordCount(); + } + // minSeqNumber stays at Long.MAX_VALUE when all records have null sequence numbers. + // Returning 0 triggers RowTrackingCommitUtils.assignSnapshotId() to use snapshot ID. + return minSeqNumber == Long.MAX_VALUE ? 0 : minSeqNumber; + } + + private long maxSeqNumber() { + if (seqNumberFieldIndex == -1) { + return seqNumCounter.getValue() - 1; + } + // When hasNullSeqNumber is true, some records have null sequence numbers. + // Returning 0 triggers RowTrackingCommitUtils.assignSnapshotId() to use snapshot ID for + // max. + return hasNullSeqNumber ? 0 : maxSeqNumber; + } + + private void updateSeqNumber(InternalRow row) { + seqNumCounter.add(1L); + + // If sequence number field exists, extract min/max from row data + if (seqNumberFieldIndex != -1 && !row.isNullAt(seqNumberFieldIndex)) { + long seqNum = row.getLong(seqNumberFieldIndex); + minSeqNumber = Math.min(minSeqNumber, seqNum); + maxSeqNumber = Math.max(maxSeqNumber, seqNum); + } else if (seqNumberFieldIndex != -1) { + // Manifest will calculate the correct max based on snapshot id + hasNullSeqNumber = true; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java index d2f3dc8851ef..ced0c18f20db 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RowTrackingCommitUtils.java @@ -49,9 +49,20 @@ public static RowTrackingAssigned assignRowTracking( private static void assignSnapshotId( long snapshotId, List deltaFiles, List snapshotAssigned) { for (ManifestEntry entry : deltaFiles) { - if (entry.file().minSequenceNumber() == 0L) { + long minSeqNumber = entry.file().minSequenceNumber(); + long maxSeqNumber = entry.file().maxSequenceNumber(); + if (minSeqNumber == 0L) { + // Case 1: New file (e.g., from INSERT) + // All records in this file get the current snapshot ID as sequence number snapshotAssigned.add(entry.assignSequenceNumber(snapshotId, snapshotId)); + } else if (maxSeqNumber == 0L) { + // Case 2: File with some modified records + // - min: Preserve original sequence number (from unmodified records) + // - max: Assign current snapshot ID + snapshotAssigned.add(entry.assignSequenceNumber(minSeqNumber, snapshotId)); } else { + // Case 3: Pure compact file (no modified records) + // Preserve original min/max sequence numbers from source files snapshotAssigned.add(entry); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java index 109d7d4e4734..0fe8a76b86a8 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java @@ -87,7 +87,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -460,13 +459,6 @@ private void compactUnAwareBucketTable( ser.deserialize( ser.getVersion(), taskIterator.next()); - if (coreOptions.rowTrackingEnabled()) { - task.compactBefore() - .sort( - Comparator.comparingLong( - DataFileMeta - ::minSequenceNumber)); - } messages.add( messageSer.serialize( task.doCompact(table, write))); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala index fc174ba7b8ef..c81d142d622c 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala @@ -255,10 +255,33 @@ abstract class RowTrackingTestBase extends PaimonSparkTestBase { sql("INSERT INTO t VALUES (4, '4')") sql("INSERT INTO t VALUES (5, '5')") + // snapshot 7: should merge files with sequence numbers [1, 6] sql("CALL sys.compact(table => 't')") checkAnswer( sql("SELECT min_sequence_number, max_sequence_number FROM `t$files`"), - Seq(Row(1, 5)) + Seq(Row(1, 6)) + ) + // snapshot 8: Updated record has null sequence number + sql("UPDATE t SET data = 22 WHERE id = 2") + + // snapshot 9 ~ 10: add new file, and set sequence number to null + sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM range(6, 8)") + sql("UPDATE t SET data = 67 WHERE _SEQUENCE_NUMBER = 9") + checkAnswer( + sql( + "SELECT min_sequence_number, max_sequence_number FROM `t$files` order by min_sequence_number"), + Seq(Row(1, 8), Row(10, 10)) + ) + checkAnswer( + sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t ORDER BY id"), + Seq( + Row(1, 1, 0, 1), + Row(2, 22, 1, 8), + Row(3, 3, 2, 3), + Row(4, 4, 3, 5), + Row(5, 5, 4, 6), + Row(6, 67, 5, 10), + Row(7, 67, 6, 10)) ) } }