Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,12 @@
<td>String</td>
<td>Consumer id for recording the offset of consumption in the storage.</td>
</tr>
<tr>
<td><h5>consumer.changelog-only</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>If true, consumer will only affect changelog expiration and will not prevent snapshot from being expired.</td>
</tr>
<tr>
<td><h5>consumer.expiration-time</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
13 changes: 13 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1383,6 +1383,14 @@ public InlineElement getDescription() {
.withDescription(
"Whether to ignore consumer progress for the newly started job.");

public static final ConfigOption<Boolean> CONSUMER_CHANGELOG_ONLY =
key("consumer.changelog-only")
.booleanType()
.defaultValue(false)
.withDescription(
"If true, consumer will only affect changelog expiration "
+ "and will not prevent snapshot from being expired.");

public static final ConfigOption<Long> DYNAMIC_BUCKET_TARGET_ROW_NUM =
key("dynamic-bucket.target-row-num")
.longType()
Expand Down Expand Up @@ -2738,6 +2746,7 @@ public ExpireConfig expireConfig() {
.changelogRetainMin(options.getOptional(CHANGELOG_NUM_RETAINED_MIN).orElse(null))
.changelogTimeRetain(options.getOptional(CHANGELOG_TIME_RETAINED).orElse(null))
.changelogMaxDeletes(snapshotExpireLimit())
.consumerChangelogOnly(consumerChangelogOnly())
.build();
}

Expand Down Expand Up @@ -3368,6 +3377,10 @@ public boolean consumerIgnoreProgress() {
return options.get(CONSUMER_IGNORE_PROGRESS);
}

public boolean consumerChangelogOnly() {
return options.get(CONSUMER_CHANGELOG_ONLY);
}

public boolean partitionedTableInMetastore() {
return options.get(METASTORE_PARTITIONED_TABLE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class ExpireConfig {
private final Duration changelogTimeRetain;
private final int changelogMaxDeletes;
private final boolean changelogDecoupled;
private final boolean consumerChangelogOnly;

public ExpireConfig(
int snapshotRetainMax,
Expand All @@ -40,7 +41,8 @@ public ExpireConfig(
int changelogRetainMax,
int changelogRetainMin,
Duration changelogTimeRetain,
int changelogMaxDeletes) {
int changelogMaxDeletes,
boolean consumerChangelogOnly) {
this.snapshotRetainMax = snapshotRetainMax;
this.snapshotRetainMin = snapshotRetainMin;
this.snapshotTimeRetain = snapshotTimeRetain;
Expand All @@ -53,6 +55,7 @@ public ExpireConfig(
changelogRetainMax > snapshotRetainMax
|| changelogRetainMin > snapshotRetainMin
|| changelogTimeRetain.compareTo(snapshotTimeRetain) > 0;
this.consumerChangelogOnly = consumerChangelogOnly;
}

public int getSnapshotRetainMax() {
Expand Down Expand Up @@ -91,6 +94,10 @@ public boolean isChangelogDecoupled() {
return changelogDecoupled;
}

public boolean isConsumerChangelogOnly() {
return consumerChangelogOnly;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -106,6 +113,7 @@ public static final class Builder {
private Integer changelogRetainMin = null;
private Duration changelogTimeRetain = null;
private Integer changelogMaxDeletes = null;
private boolean consumerChangelogOnly = false;

public static Builder builder() {
return new Builder();
Expand Down Expand Up @@ -151,6 +159,11 @@ public Builder changelogMaxDeletes(Integer changelogMaxDeletes) {
return this;
}

public Builder consumerChangelogOnly(boolean consumerChangelogOnly) {
this.consumerChangelogOnly = consumerChangelogOnly;
return this;
}

public ExpireConfig build() {
return new ExpireConfig(
snapshotRetainMax,
Expand All @@ -160,7 +173,8 @@ public ExpireConfig build() {
changelogRetainMax == null ? snapshotRetainMax : changelogRetainMax,
changelogRetainMin == null ? snapshotRetainMin : changelogRetainMin,
changelogTimeRetain == null ? snapshotTimeRetain : changelogTimeRetain,
changelogMaxDeletes == null ? snapshotMaxDeletes : changelogMaxDeletes);
changelogMaxDeletes == null ? snapshotMaxDeletes : changelogMaxDeletes,
consumerChangelogOnly);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,11 @@ public int expire() {
long maxExclusive = latestSnapshotId - retainMin + 1;

// the snapshot being read by the consumer cannot be deleted
maxExclusive =
Math.min(maxExclusive, consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE));
if (!expireConfig.isConsumerChangelogOnly()) {
maxExclusive =
Math.min(
maxExclusive, consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE));
}

// protected by 'snapshot.expire.limit'
// (the maximum number of snapshots allowed to expire at a time)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.TestFileStore;
import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.FileIO;
Expand Down Expand Up @@ -661,6 +663,48 @@ public void testManifestFileSkippingSetFileNotFoundException() throws Exception
assertSnapshot(latestSnapshotId, allData, snapshotPositions);
}

@Test
public void testConsumerChangelogOnly() throws Exception {
List<KeyValue> allData = new ArrayList<>();
List<Integer> snapshotPositions = new ArrayList<>();
commit(10, allData, snapshotPositions);

// create a consumer at snapshot 3
ConsumerManager consumerManager = new ConsumerManager(fileIO, new Path(tempDir.toUri()));
consumerManager.resetConsumer("myConsumer", new Consumer(3));

// without consumerChangelogOnly, consumer should prevent snapshot expiration
ExpireConfig configDefault =
ExpireConfig.builder()
.snapshotRetainMin(1)
.snapshotRetainMax(1)
.snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
.build();
store.newExpire(configDefault).expire();

// earliest snapshot should be 3 (protected by consumer)
assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(3L);

// with consumerChangelogOnly=true, consumer should NOT prevent snapshot expiration
ExpireConfig configChangelogOnly =
ExpireConfig.builder()
.snapshotRetainMin(1)
.snapshotRetainMax(1)
.snapshotTimeRetain(Duration.ofMillis(Long.MAX_VALUE))
.consumerChangelogOnly(true)
.build();
store.newExpire(configChangelogOnly).expire();

int latestSnapshotId2 = requireNonNull(snapshotManager.latestSnapshotId()).intValue();
// earliest snapshot should be latestSnapshotId (consumer no longer protects snapshots)
assertThat(snapshotManager.earliestSnapshotId()).isEqualTo((long) latestSnapshotId2);
assertSnapshot(latestSnapshotId2, allData, snapshotPositions);

// clean up consumer file so assertCleaned passes
consumerManager.deleteConsumer("myConsumer");
store.assertCleaned();
}

private TestFileStore createStore() {
ThreadLocalRandom random = ThreadLocalRandom.current();

Expand Down
Loading