From bd86ab41d60ed92e526b7eb15c0e533579dca65a Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sat, 21 Mar 2026 22:29:23 +0800 Subject: [PATCH 1/2] [core] Introduce 'consumer.changelog-only' to keep less snapshots --- .../generated/core_configuration.html | 6 ++++++ .../java/org/apache/paimon/CoreOptions.java | 13 +++++++++++++ .../apache/paimon/options/ExpireConfig.java | 18 ++++++++++++++++-- .../paimon/table/ExpireSnapshotsImpl.java | 7 +++++-- 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index e24b4a2795ab..44aeee4980b5 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -380,6 +380,12 @@ String Consumer id for recording the offset of consumption in the storage. + +
consumer.changelog-only
+ false + Boolean + If true, consumer will only affect changelog expiration and will not prevent snapshot from being expired. +
consumer.expiration-time
(none) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 78ce3d93678c..3533bb4a6aff 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -1383,6 +1383,14 @@ public InlineElement getDescription() { .withDescription( "Whether to ignore consumer progress for the newly started job."); + public static final ConfigOption CONSUMER_CHANGELOG_ONLY = + key("consumer.changelog-only") + .booleanType() + .defaultValue(false) + .withDescription( + "If true, consumer will only affect changelog expiration " + + "and will not prevent snapshot from being expired."); + public static final ConfigOption DYNAMIC_BUCKET_TARGET_ROW_NUM = key("dynamic-bucket.target-row-num") .longType() @@ -2738,6 +2746,7 @@ public ExpireConfig expireConfig() { .changelogRetainMin(options.getOptional(CHANGELOG_NUM_RETAINED_MIN).orElse(null)) .changelogTimeRetain(options.getOptional(CHANGELOG_TIME_RETAINED).orElse(null)) .changelogMaxDeletes(snapshotExpireLimit()) + .consumerChangelogOnly(consumerChangelogOnly()) .build(); } @@ -3368,6 +3377,10 @@ public boolean consumerIgnoreProgress() { return options.get(CONSUMER_IGNORE_PROGRESS); } + public boolean consumerChangelogOnly() { + return options.get(CONSUMER_CHANGELOG_ONLY); + } + public boolean partitionedTableInMetastore() { return options.get(METASTORE_PARTITIONED_TABLE); } diff --git a/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java b/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java index 0de3828db033..bce215ed5def 100644 --- a/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java +++ b/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java @@ -31,6 +31,7 @@ public class ExpireConfig { private final Duration changelogTimeRetain; private final int changelogMaxDeletes; private final boolean changelogDecoupled; + private final boolean consumerChangelogOnly; public ExpireConfig( int snapshotRetainMax, @@ -40,7 +41,8 @@ public ExpireConfig( int changelogRetainMax, int changelogRetainMin, Duration changelogTimeRetain, - int changelogMaxDeletes) { + int changelogMaxDeletes, + boolean consumerChangelogOnly) { this.snapshotRetainMax = snapshotRetainMax; this.snapshotRetainMin = snapshotRetainMin; this.snapshotTimeRetain = snapshotTimeRetain; @@ -53,6 +55,7 @@ public ExpireConfig( changelogRetainMax > snapshotRetainMax || changelogRetainMin > snapshotRetainMin || changelogTimeRetain.compareTo(snapshotTimeRetain) > 0; + this.consumerChangelogOnly = consumerChangelogOnly; } public int getSnapshotRetainMax() { @@ -91,6 +94,10 @@ public boolean isChangelogDecoupled() { return changelogDecoupled; } + public boolean isConsumerChangelogOnly() { + return consumerChangelogOnly; + } + public static Builder builder() { return new Builder(); } @@ -106,6 +113,7 @@ public static final class Builder { private Integer changelogRetainMin = null; private Duration changelogTimeRetain = null; private Integer changelogMaxDeletes = null; + private boolean consumerChangelogOnly = false; public static Builder builder() { return new Builder(); @@ -151,6 +159,11 @@ public Builder changelogMaxDeletes(Integer changelogMaxDeletes) { return this; } + public Builder consumerChangelogOnly(boolean consumerChangelogOnly) { + this.consumerChangelogOnly = consumerChangelogOnly; + return this; + } + public ExpireConfig build() { return new ExpireConfig( snapshotRetainMax, @@ -160,7 +173,8 @@ public ExpireConfig build() { changelogRetainMax == null ? snapshotRetainMax : changelogRetainMax, changelogRetainMin == null ? snapshotRetainMin : changelogRetainMin, changelogTimeRetain == null ? snapshotTimeRetain : changelogTimeRetain, - changelogMaxDeletes == null ? snapshotMaxDeletes : changelogMaxDeletes); + changelogMaxDeletes == null ? snapshotMaxDeletes : changelogMaxDeletes, + consumerChangelogOnly); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index fcb79eb2198a..beeb6ae2b510 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -124,8 +124,11 @@ public int expire() { long maxExclusive = latestSnapshotId - retainMin + 1; // the snapshot being read by the consumer cannot be deleted - maxExclusive = - Math.min(maxExclusive, consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE)); + if (!expireConfig.isConsumerChangelogOnly()) { + maxExclusive = + Math.min( + maxExclusive, consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE)); + } // protected by 'snapshot.expire.limit' // (the maximum number of snapshots allowed to expire at a time) From 3a4a63bb2e56c3b5b746eff5fda53f53bc124723 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Sat, 21 Mar 2026 22:35:50 +0800 Subject: [PATCH 2/2] add test --- .../paimon/operation/ExpireSnapshotsTest.java | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index d63af5fd2c3e..2e929f2e1f2f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -24,6 +24,8 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.TestFileStore; import org.apache.paimon.TestKeyValueGenerator; +import org.apache.paimon.consumer.Consumer; +import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.fs.FileIO; @@ -661,6 +663,48 @@ public void testManifestFileSkippingSetFileNotFoundException() throws Exception assertSnapshot(latestSnapshotId, allData, snapshotPositions); } + @Test + public void testConsumerChangelogOnly() throws Exception { + List allData = new ArrayList<>(); + List snapshotPositions = new ArrayList<>(); + commit(10, allData, snapshotPositions); + + // create a consumer at snapshot 3 + ConsumerManager consumerManager = new ConsumerManager(fileIO, new Path(tempDir.toUri())); + consumerManager.resetConsumer("myConsumer", new Consumer(3)); + + // without consumerChangelogOnly, consumer should prevent snapshot expiration + ExpireConfig configDefault = + ExpireConfig.builder() + .snapshotRetainMin(1) + .snapshotRetainMax(1) + .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE)) + .build(); + store.newExpire(configDefault).expire(); + + // earliest snapshot should be 3 (protected by consumer) + assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(3L); + + // with consumerChangelogOnly=true, consumer should NOT prevent snapshot expiration + ExpireConfig configChangelogOnly = + ExpireConfig.builder() + .snapshotRetainMin(1) + .snapshotRetainMax(1) + .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE)) + .consumerChangelogOnly(true) + .build(); + store.newExpire(configChangelogOnly).expire(); + + int latestSnapshotId2 = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); + // earliest snapshot should be latestSnapshotId (consumer no longer protects snapshots) + assertThat(snapshotManager.earliestSnapshotId()).isEqualTo((long) latestSnapshotId2); + assertSnapshot(latestSnapshotId2, allData, snapshotPositions); + + // clean up consumer file so assertCleaned passes + consumerManager.deleteConsumer("myConsumer"); + store.assertCleaned(); + } + private TestFileStore createStore() { ThreadLocalRandom random = ThreadLocalRandom.current();