Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.fluss.client.table.scanner.batch;

import org.apache.fluss.client.metadata.MetadataUpdater;
import org.apache.fluss.compression.ChunkedAllocationManager;
import org.apache.fluss.exception.LeaderNotAvailableException;
import org.apache.fluss.metadata.KvFormat;
import org.apache.fluss.metadata.SchemaGetter;
Expand Down Expand Up @@ -164,7 +165,12 @@ private List<InternalRow> parseLimitScanResponse(LimitScanResponse limitScanResp
}
} else {
LogRecordReadContext readContext =
LogRecordReadContext.createReadContext(tableInfo, false, null, schemaGetter);
LogRecordReadContext.createReadContext(
tableInfo,
false,
null,
schemaGetter,
new ChunkedAllocationManager.ChunkedFactory());
LogRecords records = MemoryLogRecords.pointToByteBuffer(recordsBuffer);
Comment on lines 167 to 174
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In parseLimitScanResponse, the newly created LogRecordReadContext is never closed. Since LogRecordReadContext owns Arrow resources (VectorSchemaRoots + BufferAllocator), this will leak direct memory and native buffers. Please wrap the LogRecordReadContext in a try-with-resources (or otherwise ensure it’s closed) around the iteration over batches/records.

Copilot uses AI. Check for mistakes.
for (LogRecordBatch logRecordBatch : records.batches()) {
// A batch of log record maybe little more than limit, thus we need slice the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.fluss.client.metrics.ScannerMetricGroup;
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
import org.apache.fluss.cluster.BucketLocation;
import org.apache.fluss.compression.ChunkedAllocationManager;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.ApiException;
Expand Down Expand Up @@ -123,10 +124,14 @@ public LogFetcher(
SchemaGetter schemaGetter) {
this.tablePath = tableInfo.getTablePath();
this.isPartitioned = tableInfo.isPartitioned();
ChunkedAllocationManager.ChunkedFactory chunkedFactory =
new ChunkedAllocationManager.ChunkedFactory();
this.readContext =
LogRecordReadContext.createReadContext(tableInfo, false, projection, schemaGetter);
LogRecordReadContext.createReadContext(
tableInfo, false, projection, schemaGetter, chunkedFactory);
this.remoteReadContext =
LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter);
LogRecordReadContext.createReadContext(
tableInfo, true, projection, schemaGetter, chunkedFactory);
Comment on lines +127 to +134
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogFetcher creates a ChunkedFactory instance and shares it across two LogRecordReadContexts, but it is not retained as a field and never closed. If ChunkedFactory retains native chunks (as its close() method suggests), those chunks may remain allocated after readContext/remoteReadContext are closed. Consider storing the factory as a member and closing it in LogFetcher.close() (or ensure allocator shutdown closes the factory automatically).

Copilot uses AI. Check for mistakes.
this.projection = projection;
this.logScannerStatus = logScannerStatus;
this.maxFetchBytes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.client.metrics.WriterMetricGroup;
import org.apache.fluss.cluster.BucketLocation;
import org.apache.fluss.cluster.Cluster;
import org.apache.fluss.compression.ChunkedAllocationManager;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FlussRuntimeException;
Expand All @@ -36,7 +37,6 @@
import org.apache.fluss.row.arrow.ArrowWriter;
import org.apache.fluss.row.arrow.ArrowWriterPool;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
import org.apache.fluss.utils.CopyOnWriteMap;
import org.apache.fluss.utils.MathUtils;
import org.apache.fluss.utils.clock.Clock;
Expand All @@ -63,6 +63,7 @@

import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocatorUtil.createBufferAllocator;
import static org.apache.fluss.utils.Preconditions.checkNotNull;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
Expand Down Expand Up @@ -134,7 +135,7 @@ public final class RecordAccumulator {
Math.max(1, (int) conf.get(ConfigOptions.CLIENT_WRITER_BATCH_SIZE).getBytes());

this.writerBufferPool = LazyMemorySegmentPool.createWriterBufferPool(conf);
this.bufferAllocator = new RootAllocator(Long.MAX_VALUE);
this.bufferAllocator = createBufferAllocator(new ChunkedAllocationManager.ChunkedFactory());
this.arrowWriterPool = new ArrowWriterPool(bufferAllocator);
this.incomplete = new IncompleteBatches();
this.nodesDrainIndex = new HashMap<>();
Expand Down
Loading