diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index e24b4a2795ab..44aeee4980b5 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -380,6 +380,12 @@
String |
Consumer id for recording the offset of consumption in the storage. |
+
+ consumer.changelog-only |
+ false |
+ Boolean |
+ If true, consumer will only affect changelog expiration and will not prevent snapshot from being expired. |
+
consumer.expiration-time |
(none) |
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 78ce3d93678c..3533bb4a6aff 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1383,6 +1383,14 @@ public InlineElement getDescription() {
.withDescription(
"Whether to ignore consumer progress for the newly started job.");
+ public static final ConfigOption CONSUMER_CHANGELOG_ONLY =
+ key("consumer.changelog-only")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If true, consumer will only affect changelog expiration "
+ + "and will not prevent snapshot from being expired.");
+
public static final ConfigOption DYNAMIC_BUCKET_TARGET_ROW_NUM =
key("dynamic-bucket.target-row-num")
.longType()
@@ -2738,6 +2746,7 @@ public ExpireConfig expireConfig() {
.changelogRetainMin(options.getOptional(CHANGELOG_NUM_RETAINED_MIN).orElse(null))
.changelogTimeRetain(options.getOptional(CHANGELOG_TIME_RETAINED).orElse(null))
.changelogMaxDeletes(snapshotExpireLimit())
+ .consumerChangelogOnly(consumerChangelogOnly())
.build();
}
@@ -3368,6 +3377,10 @@ public boolean consumerIgnoreProgress() {
return options.get(CONSUMER_IGNORE_PROGRESS);
}
+ public boolean consumerChangelogOnly() {
+ return options.get(CONSUMER_CHANGELOG_ONLY);
+ }
+
public boolean partitionedTableInMetastore() {
return options.get(METASTORE_PARTITIONED_TABLE);
}
diff --git a/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java b/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java
index 0de3828db033..bce215ed5def 100644
--- a/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java
+++ b/paimon-api/src/main/java/org/apache/paimon/options/ExpireConfig.java
@@ -31,6 +31,7 @@ public class ExpireConfig {
private final Duration changelogTimeRetain;
private final int changelogMaxDeletes;
private final boolean changelogDecoupled;
+ private final boolean consumerChangelogOnly;
public ExpireConfig(
int snapshotRetainMax,
@@ -40,7 +41,8 @@ public ExpireConfig(
int changelogRetainMax,
int changelogRetainMin,
Duration changelogTimeRetain,
- int changelogMaxDeletes) {
+ int changelogMaxDeletes,
+ boolean consumerChangelogOnly) {
this.snapshotRetainMax = snapshotRetainMax;
this.snapshotRetainMin = snapshotRetainMin;
this.snapshotTimeRetain = snapshotTimeRetain;
@@ -53,6 +55,7 @@ public ExpireConfig(
changelogRetainMax > snapshotRetainMax
|| changelogRetainMin > snapshotRetainMin
|| changelogTimeRetain.compareTo(snapshotTimeRetain) > 0;
+ this.consumerChangelogOnly = consumerChangelogOnly;
}
public int getSnapshotRetainMax() {
@@ -91,6 +94,10 @@ public boolean isChangelogDecoupled() {
return changelogDecoupled;
}
+ public boolean isConsumerChangelogOnly() {
+ return consumerChangelogOnly;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -106,6 +113,7 @@ public static final class Builder {
private Integer changelogRetainMin = null;
private Duration changelogTimeRetain = null;
private Integer changelogMaxDeletes = null;
+ private boolean consumerChangelogOnly = false;
public static Builder builder() {
return new Builder();
@@ -151,6 +159,11 @@ public Builder changelogMaxDeletes(Integer changelogMaxDeletes) {
return this;
}
+ public Builder consumerChangelogOnly(boolean consumerChangelogOnly) {
+ this.consumerChangelogOnly = consumerChangelogOnly;
+ return this;
+ }
+
public ExpireConfig build() {
return new ExpireConfig(
snapshotRetainMax,
@@ -160,7 +173,8 @@ public ExpireConfig build() {
changelogRetainMax == null ? snapshotRetainMax : changelogRetainMax,
changelogRetainMin == null ? snapshotRetainMin : changelogRetainMin,
changelogTimeRetain == null ? snapshotTimeRetain : changelogTimeRetain,
- changelogMaxDeletes == null ? snapshotMaxDeletes : changelogMaxDeletes);
+ changelogMaxDeletes == null ? snapshotMaxDeletes : changelogMaxDeletes,
+ consumerChangelogOnly);
}
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index fcb79eb2198a..beeb6ae2b510 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -124,8 +124,11 @@ public int expire() {
long maxExclusive = latestSnapshotId - retainMin + 1;
// the snapshot being read by the consumer cannot be deleted
- maxExclusive =
- Math.min(maxExclusive, consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE));
+ if (!expireConfig.isConsumerChangelogOnly()) {
+ maxExclusive =
+ Math.min(
+ maxExclusive, consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE));
+ }
// protected by 'snapshot.expire.limit'
// (the maximum number of snapshots allowed to expire at a time)
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
index d63af5fd2c3e..2e929f2e1f2f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java
@@ -24,6 +24,8 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.TestFileStore;
import org.apache.paimon.TestKeyValueGenerator;
+import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.FileIO;
@@ -661,6 +663,48 @@ public void testManifestFileSkippingSetFileNotFoundException() throws Exception
assertSnapshot(latestSnapshotId, allData, snapshotPositions);
}
+ @Test
+ public void testConsumerChangelogOnly() throws Exception {
+ List allData = new ArrayList<>();
+ List snapshotPositions = new ArrayList<>();
+ commit(10, allData, snapshotPositions);
+
+ // create a consumer at snapshot 3
+ ConsumerManager consumerManager = new ConsumerManager(fileIO, new Path(tempDir.toUri()));
+ consumerManager.resetConsumer("myConsumer", new Consumer(3));
+
+ // without consumerChangelogOnly, consumer should prevent snapshot expiration
+ ExpireConfig configDefault =
+ ExpireConfig.builder()
+ .snapshotRetainMin(1)
+ .snapshotRetainMax(1)
+ .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
+ .build();
+ store.newExpire(configDefault).expire();
+
+ // earliest snapshot should be 3 (protected by consumer)
+ assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(3L);
+
+ // with consumerChangelogOnly=true, consumer should NOT prevent snapshot expiration
+ ExpireConfig configChangelogOnly =
+ ExpireConfig.builder()
+ .snapshotRetainMin(1)
+ .snapshotRetainMax(1)
+ .snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
+ .consumerChangelogOnly(true)
+ .build();
+ store.newExpire(configChangelogOnly).expire();
+
+ int latestSnapshotId2 = requireNonNull(snapshotManager.latestSnapshotId()).intValue();
+ // earliest snapshot should be latestSnapshotId (consumer no longer protects snapshots)
+ assertThat(snapshotManager.earliestSnapshotId()).isEqualTo((long) latestSnapshotId2);
+ assertSnapshot(latestSnapshotId2, allData, snapshotPositions);
+
+ // clean up consumer file so assertCleaned passes
+ consumerManager.deleteConsumer("myConsumer");
+ store.assertCleaned();
+ }
+
private TestFileStore createStore() {
ThreadLocalRandom random = ThreadLocalRandom.current();