diff --git a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java index b0fcaeecb7..c1cc2b5b69 100644 --- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java +++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java @@ -222,11 +222,13 @@ public class MetricNames { // metrics for table bucket // -------------------------------------------------------------------------------------------- + // for tablet + public static final String LAKE_PENDING_RECORDS = "pendingRecords"; + // for log tablet public static final String LOG_NUM_SEGMENTS = "numSegments"; public static final String LOG_END_OFFSET = "endOffset"; public static final String REMOTE_LOG_SIZE = "size"; - public static final String LOG_LAKE_PENDING_RECORDS = "pendingRecords"; public static final String LOG_LAKE_TIMESTAMP_LAG = "timestampLag"; // for logic storage diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java index 89e435bbdd..5da5c67531 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java @@ -549,11 +549,13 @@ private void onBecomeNewLeader() { private void registerLakeTieringMetrics() { lakeTieringMetricGroup = bucketMetricGroup.addGroup("lakeTiering"); lakeTieringMetricGroup.gauge( - MetricNames.LOG_LAKE_PENDING_RECORDS, - () -> - getLakeLogEndOffset() < 0L - ? getLogHighWatermark() - getLogStartOffset() - : getLogHighWatermark() - getLakeLogEndOffset()); + MetricNames.LAKE_PENDING_RECORDS, + () -> { + long lakeLogEndOffset = getLakeLogEndOffset(); + return lakeLogEndOffset < 0L + ? getRowCount() + : getLogHighWatermark() - lakeLogEndOffset; + }); lakeTieringMetricGroup.gauge( MetricNames.LOG_LAKE_TIMESTAMP_LAG, () ->