From bdc904d69ca2f290858606ff42e22a7af4c8d878 Mon Sep 17 00:00:00 2001 From: Gio Gutierrez Date: Mon, 9 Feb 2026 13:08:13 -0500 Subject: [PATCH] [fluss] allow setting table.log.ttl to -1 for infinite retention --- .../fluss/config/ConfigurationUtils.java | 7 +++++++ .../fluss/config/ConfigurationTest.java | 16 ++++++++++++++++ .../log/remote/RemoteLogTabletTest.java | 19 +++++++++++++++++++ 3 files changed, 42 insertions(+) diff --git a/fluss-common/src/main/java/org/apache/fluss/config/ConfigurationUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/ConfigurationUtils.java index ad800aa8f6..d7f0177b06 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigurationUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigurationUtils.java @@ -139,6 +139,10 @@ static Duration convertToDuration(Object o) { return (Duration) o; } + String s = o.toString().trim(); + if ("-1".equals(s)) { + return Duration.ofMillis(-1); + } return TimeUtils.parseDuration(o.toString()); } @@ -155,6 +159,9 @@ static String convertToString(Object o) { return (String) o; } else if (o.getClass() == Duration.class) { Duration duration = (Duration) o; + if (duration.toMillis() == -1) { + return "-1"; + } return TimeUtils.formatWithHighestUnit(duration); } else if (o instanceof List) { return ((List) o) diff --git a/fluss-common/src/test/java/org/apache/fluss/config/ConfigurationTest.java b/fluss-common/src/test/java/org/apache/fluss/config/ConfigurationTest.java index 77dfec1680..4b78a412fd 100644 --- a/fluss-common/src/test/java/org/apache/fluss/config/ConfigurationTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/config/ConfigurationTest.java @@ -108,6 +108,8 @@ void testGetterAndSetter() throws Exception { assertThat(conf.get(DURATION_OPTION).toMillis()).isEqualTo(3); conf.setString(DURATION_OPTION.key(), "3 s"); assertThat(conf.get(DURATION_OPTION).toMillis()).isEqualTo(3000); + conf.setString(DURATION_OPTION.key(), "-1"); + assertThat(conf.get(DURATION_OPTION)).isEqualTo(Duration.ofMillis(-1)); conf.setBytes("test-bytes-key", new byte[] {1, 2, 3, 4, 5}); assertThat(conf.getBytes("test-bytes-key", new byte[0]).length).isEqualTo(5); @@ -428,6 +430,20 @@ void testToMap() { assertThat(conf.toMap().get(DURATION_OPTION.key())).isEqualTo("3 s"); } + @Test + void testTableLogTtlMinusOneParseAndRoundTrip() { + // Parse "-1" as Duration (e.g. table.log.ttl = -1 means never delete logs) + final Configuration conf = new Configuration(); + conf.setString(ConfigOptions.TABLE_LOG_TTL.key(), "-1"); + assertThat(conf.get(ConfigOptions.TABLE_LOG_TTL)).isEqualTo(Duration.ofMillis(-1)); + assertThat(conf.get(ConfigOptions.TABLE_LOG_TTL).toMillis()).isEqualTo(-1L); + + // Round-trip: Duration.ofMillis(-1) serializes back to "-1" + final Configuration conf2 = new Configuration(); + conf2.set(ConfigOptions.TABLE_LOG_TTL, Duration.ofMillis(-1)); + assertThat(conf2.toMap().get(ConfigOptions.TABLE_LOG_TTL.key())).isEqualTo("-1"); + } + @Test void testMapNotContained() { final Configuration conf = new Configuration(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java index 0d571e1ea5..f3880e7832 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTabletTest.java @@ -17,6 +17,7 @@ package org.apache.fluss.server.log.remote; +import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.remote.RemoteLogSegment; import org.apache.fluss.server.log.LogTablet; @@ -24,6 +25,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -181,6 +183,23 @@ void testFindRemoteLogSegmentByTimestamp(boolean partitionTable) throws Exceptio assertThat(remoteLogTablet.findSegmentByTimestamp(51L)).isNull(); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testExpiredSegmentsNeverDeletedWhenLogTtlMinusOne(boolean partitionTable) + throws Exception { + conf.set(ConfigOptions.TABLE_LOG_TTL, Duration.ofMillis(-1)); + LogTablet logTablet = makeLogTabletAndAddSegments(partitionTable); + RemoteLogTablet remoteLogTablet = buildRemoteLogTablet(logTablet); + List remoteLogSegmentList = createRemoteLogSegmentList(logTablet); + remoteLogTablet.addAndDeleteLogSegments(remoteLogSegmentList, Collections.emptyList()); + assertThat(remoteLogTablet.allRemoteLogSegments()).hasSize(5); + manualClock.advanceTime(Duration.ofDays(8)); + List expired = + remoteLogTablet.expiredRemoteLogSegments(manualClock.milliseconds(), null); + assertThat(expired).isEmpty(); + assertThat(remoteLogTablet.allRemoteLogSegments()).hasSize(5); + } + RemoteLogSegment createLogSegmentWithMaxTimestamp( LogTablet logTablet, long timestamp,