From 33945f226b79db066ba9d5e444ee31055c625e31 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Tue, 31 Mar 2026 20:01:40 +0800 Subject: [PATCH 1/4] [client] Replace Netty PooledByteBufAllocator with bump-pointer ChunkedAllocationManager for Arrow memory allocation. --- .../scanner/batch/LimitBatchScanner.java | 8 +- .../client/table/scanner/log/LogFetcher.java | 9 +- .../fluss/client/write/RecordAccumulator.java | 4 +- .../compression/ChunkedAllocationManager.java | 368 ++++++++++++++++++ .../fluss/record/LogRecordReadContext.java | 56 ++- .../apache/arrow/memory/AllocatorUtil.java | 39 ++ 6 files changed, 474 insertions(+), 10 deletions(-) create mode 100644 fluss-common/src/main/java/org/apache/fluss/compression/ChunkedAllocationManager.java create mode 100644 fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/AllocatorUtil.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java index f4a3be53f6..d3fd60bf37 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java @@ -18,6 +18,7 @@ package org.apache.fluss.client.table.scanner.batch; import org.apache.fluss.client.metadata.MetadataUpdater; +import org.apache.fluss.compression.ChunkedAllocationManager; import org.apache.fluss.exception.LeaderNotAvailableException; import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.SchemaGetter; @@ -164,7 +165,12 @@ private List parseLimitScanResponse(LimitScanResponse limitScanResp } } else { LogRecordReadContext readContext = - LogRecordReadContext.createReadContext(tableInfo, false, null, schemaGetter); + LogRecordReadContext.createReadContext( + tableInfo, + false, + null, + schemaGetter, + new ChunkedAllocationManager.ChunkedFactory()); LogRecords records = MemoryLogRecords.pointToByteBuffer(recordsBuffer); for (LogRecordBatch logRecordBatch : records.batches()) { // A batch of log record maybe little more than limit, thus we need slice the diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index 505b82c127..a2615eedeb 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -23,6 +23,7 @@ import org.apache.fluss.client.metrics.ScannerMetricGroup; import org.apache.fluss.client.table.scanner.RemoteFileDownloader; import org.apache.fluss.cluster.BucketLocation; +import org.apache.fluss.compression.ChunkedAllocationManager; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.ApiException; @@ -128,10 +129,14 @@ public LogFetcher( SchemaGetter schemaGetter) { this.tablePath = tableInfo.getTablePath(); this.isPartitioned = tableInfo.isPartitioned(); + ChunkedAllocationManager.ChunkedFactory chunkedFactory = + new ChunkedAllocationManager.ChunkedFactory(); this.readContext = - LogRecordReadContext.createReadContext(tableInfo, false, projection, schemaGetter); + LogRecordReadContext.createReadContext( + tableInfo, false, projection, schemaGetter, chunkedFactory); this.remoteReadContext = - LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter); + LogRecordReadContext.createReadContext( + tableInfo, true, projection, schemaGetter, chunkedFactory); this.projection = projection; this.cachedPbPredicate = recordBatchFilter != null diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 5f35b1e1cb..30f2b9b7c4 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -22,6 +22,7 @@ import org.apache.fluss.client.metrics.WriterMetricGroup; import org.apache.fluss.cluster.BucketLocation; import org.apache.fluss.cluster.Cluster; +import org.apache.fluss.compression.ChunkedAllocationManager; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.FlussRuntimeException; @@ -63,6 +64,7 @@ import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; +import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocatorUtil.createBufferAllocator; import static org.apache.fluss.utils.Preconditions.checkNotNull; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache @@ -134,7 +136,7 @@ public final class RecordAccumulator { Math.max(1, (int) conf.get(ConfigOptions.CLIENT_WRITER_BATCH_SIZE).getBytes()); this.writerBufferPool = LazyMemorySegmentPool.createWriterBufferPool(conf); - this.bufferAllocator = BufferAllocatorUtil.createBufferAllocator(); + this.bufferAllocator = createBufferAllocator(new ChunkedAllocationManager.ChunkedFactory()); this.arrowWriterPool = new ArrowWriterPool(bufferAllocator); this.incomplete = new IncompleteBatches(); this.nodesDrainIndex = new HashMap<>(); diff --git a/fluss-common/src/main/java/org/apache/fluss/compression/ChunkedAllocationManager.java b/fluss-common/src/main/java/org/apache/fluss/compression/ChunkedAllocationManager.java new file mode 100644 index 0000000000..6c23950370 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/compression/ChunkedAllocationManager.java @@ -0,0 +1,368 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.compression; + +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationManager; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ReferenceManager; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.util.MemoryUtil; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * An {@link AllocationManager} that packs small allocations into large pre-allocated chunks using a + * bump-pointer strategy, specifically designed for Arrow column decompression in high-column-count + * scenarios (e.g., 1000+ columns). + * + *

Why not Netty's PooledByteBufAllocator?

+ * + *

Arrow column decompression produces a burst of ~3000 small, variable-sized allocations (1000 + * columns x ~3 buffers each: validity, offset, data). Netty's allocator is optimized for a + * different pattern and causes problems here: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
AspectNetty PooledByteBufAllocatorChunkedAllocationManager
Small allocationRoutes through size-class subpages (tiny/small/normal). Different sizes land on + * different subpages within different pages of the same or different chunks, + * causing cross-chunk fragmentation.Sequential bump-pointer within the current chunk. All sizes packed contiguously into + * one chunk regardless of size class.
Chunk selectionTries q050 → q025 → q000 → qInit → q075, preferring + * moderately-used chunks. May spread allocations across multiple chunks.Always uses the current active chunk. Only switches when full.
Chunk reclamationA chunk is freed only when it is in {@code q000} and usage drops to 0%. But + * cross-size-class subpage pinning often prevents any single chunk from reaching 0%. + * The first chunk per arena (in {@code qInit}) is never freed.When all sub-allocations in a chunk are released ({@code subAllocCount} reaches 0), + * the chunk is immediately recycled to the free-list.
Thread-local caching{@code PoolThreadCache} holds released buffers in thread-local caches, delaying their + * return to the chunk. Cache is trimmed only every 8192 allocations or on thread exit, + * making chunk usage appear higher than actual.No thread-local caching. Release decrements {@code subAllocCount} immediately.
Typical result for 1000-column decompression3000 buffers spread across 2-5 chunks. After batch release, chunks remain partially + * used due to subpage pinning + thread cache. Memory grows over successive batches + * → OOM.3000 buffers land in 1-2 chunks. After batch release, all chunks immediately + * recyclable. Stable memory footprint across batches.
+ * + *

How it works

+ * + * + * + *

Batch-friendly lifecycle

+ * + *
+ *   Batch N:  allocator.buffer() x 3000  →  all land in chunk A (bump pointer)
+ *             process batch...
+ *             release all buffers          →  subAllocCount = 0 → chunk A recycled
+ *
+ *   Batch N+1: allocator.buffer() x 3000  →  reuse chunk A (zero malloc/free)
+ * 
+ * + *

Usage

+ * + *
{@code
+ * BufferAllocator allocator = new RootAllocator(
+ *         BaseAllocator.configBuilder()
+ *                 .allocationManagerFactory(new ChunkedAllocationManager.ChunkedFactory())
+ *                 .build());
+ * }
+ * + *

For allocations >= chunkSize, a dedicated memory region is allocated directly (no bump + * pointer), behaving identically to {@link UnsafeAllocationManager}. + */ +public class ChunkedAllocationManager extends AllocationManager { + + /** 8-byte alignment for all sub-allocations within a chunk. */ + private static final long ALIGNMENT = 8; + + /** Default chunk size: 4MB (matches Netty 4.1+ maxOrder=9). */ + private static final long DEFAULT_CHUNK_SIZE = 4L * 1024 * 1024; + + /** Default maximum number of empty chunks to keep in the free-list. */ + private static final int DEFAULT_MAX_FREE_CHUNKS = 3; + + private final long allocatedSize; + + // --- Fields for sub-allocation (small request carved from a shared chunk) --- + private final Chunk chunk; + private final long offsetInChunk; + + // --- Fields for direct allocation (large request, owns its own memory) --- + private final long directAddress; + + /** Sub-allocation carved from a shared {@link Chunk}. */ + private ChunkedAllocationManager( + BufferAllocator accountingAllocator, Chunk chunk, long offset, long size) { + super(accountingAllocator); + this.chunk = chunk; + this.offsetInChunk = offset; + this.allocatedSize = size; + this.directAddress = 0; + } + + /** Direct allocation for oversized requests (>= chunkSize). Owns its own memory region. */ + private ChunkedAllocationManager(BufferAllocator accountingAllocator, long address, long size) { + super(accountingAllocator); + this.chunk = null; + this.offsetInChunk = 0; + this.allocatedSize = size; + this.directAddress = address; + } + + @Override + public long getSize() { + return allocatedSize; + } + + @Override + protected long memoryAddress() { + if (chunk != null) { + return chunk.address + offsetInChunk; + } + return directAddress; + } + + @Override + protected void release0() { + if (chunk != null) { + chunk.releaseSubAllocation(); + } else { + MemoryUtil.UNSAFE.freeMemory(directAddress); + } + } + + // ------------------------------------------------------------------------- + // Chunk — a contiguous memory region with bump-pointer sub-allocation + // ------------------------------------------------------------------------- + + /** + * A contiguous native memory region that holds multiple small allocations via bump-pointer. + * Reference-counted: when all sub-allocations are released (count reaches 0), the chunk is + * recycled back to the factory's free-list. + */ + static class Chunk { + final long address; + final long capacity; + /** Bump pointer — only accessed under the factory's synchronized lock. */ + long used; + /** + * Number of active sub-allocations. Decremented from arbitrary threads when ArrowBuf + * instances are released. + */ + final AtomicInteger subAllocCount = new AtomicInteger(0); + /** Back-reference to the owning factory for recycling on drain. */ + final ChunkedFactory factory; + + Chunk(long capacity, ChunkedFactory factory) { + this.address = MemoryUtil.UNSAFE.allocateMemory(capacity); + this.capacity = capacity; + this.used = 0; + this.factory = factory; + } + + /** Returns true if this chunk has room for an aligned allocation of the given size. */ + boolean hasRoom(long alignedSize) { + return used + alignedSize <= capacity; + } + + /** + * Bump-allocates {@code alignedSize} bytes from this chunk. Must be called under the + * factory lock. + * + * @return the offset within this chunk where the allocation starts. + */ + long bumpAllocate(long alignedSize) { + long offset = used; + used += alignedSize; + subAllocCount.incrementAndGet(); + return offset; + } + + /** + * Called when a sub-allocation's ArrowBuf is released. If the chunk is now empty (all + * sub-allocations freed), notifies the factory for recycling. + */ + void releaseSubAllocation() { + if (subAllocCount.decrementAndGet() == 0) { + factory.onChunkDrained(this); + } + } + + /** Resets the bump pointer so the chunk can be reused for new allocations. */ + void resetBump() { + used = 0; + // subAllocCount is already 0 at this point. + } + + /** Frees the underlying native memory. */ + void destroy() { + MemoryUtil.UNSAFE.freeMemory(address); + } + } + + // ------------------------------------------------------------------------- + // Factory + // ------------------------------------------------------------------------- + + /** + * Factory that creates {@link ChunkedAllocationManager} instances. + * + *

Small allocations (< chunkSize) are packed into the current active chunk via bump-pointer. + * When the active chunk is full, a recycled or freshly-allocated chunk is used. Large + * allocations (>= chunkSize) get their own dedicated memory region. + * + *

This factory is thread-safe: {@link #create} and {@link #onChunkDrained} are {@code + * synchronized}. + */ + public static class ChunkedFactory implements AllocationManager.Factory { + + private static final ArrowBuf EMPTY = + new ArrowBuf(ReferenceManager.NO_OP, null, 0, MemoryUtil.UNSAFE.allocateMemory(0)); + + private final long chunkSize; + private final int maxFreeChunks; + + /** The chunk currently receiving bump allocations. May be null initially. */ + private Chunk activeChunk; + + /** Pool of empty chunks available for reuse. */ + private final Deque freeChunks = new ArrayDeque<>(); + + /** Creates a factory with default chunk size (4MB) and max 3 free chunks. */ + public ChunkedFactory() { + this(DEFAULT_CHUNK_SIZE, DEFAULT_MAX_FREE_CHUNKS); + } + + /** + * Creates a factory with the given chunk size and free-chunk pool limit. + * + * @param chunkSize maximum size of each chunk (bytes). Allocations >= this go direct. + * @param maxFreeChunks maximum number of empty chunks to keep cached for reuse. + */ + public ChunkedFactory(long chunkSize, int maxFreeChunks) { + this.chunkSize = chunkSize; + this.maxFreeChunks = maxFreeChunks; + } + + @Override + public synchronized AllocationManager create( + BufferAllocator accountingAllocator, long size) { + if (size >= chunkSize) { + // Large allocation: give it its own memory region. + long address = MemoryUtil.UNSAFE.allocateMemory(size); + return new ChunkedAllocationManager(accountingAllocator, address, size); + } + + // Align to 8 bytes for safe direct-memory access. + long alignedSize = (size + ALIGNMENT - 1) & ~(ALIGNMENT - 1); + + if (activeChunk == null || !activeChunk.hasRoom(alignedSize)) { + // Current chunk is full or doesn't exist — obtain a recycled or new chunk. + activeChunk = obtainChunk(); + } + + long offset = activeChunk.bumpAllocate(alignedSize); + return new ChunkedAllocationManager(accountingAllocator, activeChunk, offset, size); + } + + @Override + public ArrowBuf empty() { + return EMPTY; + } + + /** + * Obtains a chunk for use as the new active chunk. Prefers recycled chunks from the + * free-list; falls back to allocating a fresh one. + */ + private Chunk obtainChunk() { + Chunk recycled = freeChunks.pollFirst(); + if (recycled != null) { + recycled.resetBump(); + return recycled; + } + return new Chunk(chunkSize, this); + } + + /** + * Called when a chunk's sub-allocation count reaches zero (all ArrowBufs released). + * Recycles the chunk if possible, or frees its memory if the free-list is full. + * + *

Thread-safety note: between {@code subAllocCount} reaching 0 and this method acquiring + * the lock, another thread may call {@link #create} and bump-allocate from this chunk (if + * it is still the active chunk). The guard {@code chunk.subAllocCount.get() > 0} handles + * this race. + */ + synchronized void onChunkDrained(Chunk chunk) { + // Guard: if new sub-allocations were added between refCount=0 and now, skip. + if (chunk.subAllocCount.get() > 0) { + return; + } + + if (chunk == activeChunk) { + // Still the active chunk — just reset bump pointer for continued use. + chunk.resetBump(); + } else if (freeChunks.size() < maxFreeChunks) { + // Not active, pool has room — recycle. + freeChunks.offerFirst(chunk); + } else { + // Pool is full — free native memory. + chunk.destroy(); + } + } + + /** + * Closes this factory, freeing all cached chunks. Active chunks with outstanding + * sub-allocations will be freed when their last ArrowBuf is released. + */ + public synchronized void close() { + while (!freeChunks.isEmpty()) { + freeChunks.poll().destroy(); + } + if (activeChunk != null && activeChunk.subAllocCount.get() == 0) { + activeChunk.destroy(); + } + activeChunk = null; + } + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 29175aabdc..7ba0bb5d06 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -18,6 +18,7 @@ package org.apache.fluss.record; import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.compression.ChunkedAllocationManager; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.SchemaGetter; @@ -25,6 +26,8 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.InternalRow.FieldGetter; import org.apache.fluss.row.ProjectedRow; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationManager; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocatorUtil; import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; @@ -68,6 +71,29 @@ public static LogRecordReadContext createReadContext( boolean readFromRemote, @Nullable Projection projection, SchemaGetter schemaGetter) { + return createReadContext( + tableInfo, + readFromRemote, + projection, + schemaGetter, + new ChunkedAllocationManager.ChunkedFactory()); + } + + /** + * Creates a {@link LogRecordReadContext} with a custom {@link AllocationManager.Factory}. + * + * @param tableInfo the table info of the table to read + * @param readFromRemote whether the data is read from remote storage + * @param projection the projection to apply, or null for all fields + * @param schemaGetter the schema getter to resolve schema by id + * @param allocationManagerFactory the factory for creating Arrow memory allocations + */ + public static LogRecordReadContext createReadContext( + TableInfo tableInfo, + boolean readFromRemote, + @Nullable Projection projection, + SchemaGetter schemaGetter, + AllocationManager.Factory allocationManagerFactory) { RowType rowType = tableInfo.getRowType(); LogFormat logFormat = tableInfo.getTableConfig().getLogFormat(); // only for arrow log format, the projection can be push downed to the server side @@ -84,7 +110,12 @@ public static LogRecordReadContext createReadContext( // so set the rowType as is. int[] selectedFields = projection.getProjection(); return createArrowReadContext( - rowType, schemaId, selectedFields, false, schemaGetter); + rowType, + schemaId, + selectedFields, + false, + schemaGetter, + allocationManagerFactory); } else { // arrow data that returned from server has been projected (in order) RowType projectedRowType = projection.projectInOrder(rowType); @@ -95,7 +126,8 @@ public static LogRecordReadContext createReadContext( schemaId, selectedFields, projectionPushDowned, - schemaGetter); + schemaGetter, + allocationManagerFactory); } } else if (logFormat == LogFormat.INDEXED) { int[] selectedFields = projection.getProjection(); @@ -113,9 +145,10 @@ private static LogRecordReadContext createArrowReadContext( int schemaId, int[] selectedFields, boolean projectionPushDowned, - SchemaGetter schemaGetter) { + SchemaGetter schemaGetter, + AllocationManager.Factory allocationManagerFactory) { // TODO: use a more reasonable memory limit - BufferAllocator allocator = BufferAllocatorUtil.createBufferAllocator(); + BufferAllocator allocator = BufferAllocatorUtil.createBufferAllocator(allocationManagerFactory); FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields); return new LogRecordReadContext( LogFormat.ARROW, @@ -139,7 +172,13 @@ private static LogRecordReadContext createArrowReadContext( public static LogRecordReadContext createArrowReadContext( RowType rowType, int schemaId, SchemaGetter schemaGetter) { int[] selectedFields = IntStream.range(0, rowType.getFieldCount()).toArray(); - return createArrowReadContext(rowType, schemaId, selectedFields, false, schemaGetter); + return createArrowReadContext( + rowType, + schemaId, + selectedFields, + false, + schemaGetter, + new ChunkedAllocationManager.ChunkedFactory()); } @VisibleForTesting @@ -150,7 +189,12 @@ public static LogRecordReadContext createArrowReadContext( boolean projectionPushDowned) { int[] selectedFields = IntStream.range(0, rowType.getFieldCount()).toArray(); return createArrowReadContext( - rowType, schemaId, selectedFields, projectionPushDowned, schemaGetter); + rowType, + schemaId, + selectedFields, + projectionPushDowned, + schemaGetter, + new ChunkedAllocationManager.ChunkedFactory()); } /** diff --git a/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/AllocatorUtil.java b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/AllocatorUtil.java new file mode 100644 index 0000000000..ac4eb97a92 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/AllocatorUtil.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.shaded.arrow.org.apache.arrow.memory; + +import org.apache.fluss.record.FlussRoundingPolicy; + +import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.configBuilder; + +/** Util to create UNSAFE_ALLOCATOR. */ +public class AllocatorUtil { + + private AllocatorUtil() {} + + public static BufferAllocator createBufferAllocator( + AllocationManager.Factory allocationManagerFactory) { + return new RootAllocator( + configBuilder() + .listener(AllocationListener.NOOP) + .maxAllocation(Long.MAX_VALUE) + .roundingPolicy(FlussRoundingPolicy.DEFAULT_ROUNDING_POLICY) + .allocationManagerFactory(allocationManagerFactory) + .build()); + } +} From 1bc669987b060dfc4c2f68e4476951bbc3123264 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Mon, 13 Apr 2026 19:38:01 +0800 Subject: [PATCH 2/4] modified based on CR --- .../scanner/batch/LimitBatchScanner.java | 13 +- .../client/table/scanner/log/LogFetcher.java | 7 +- .../fluss/client/write/RecordAccumulator.java | 13 +- .../fluss/record/LogRecordReadContext.java | 8 +- .../apache/arrow/memory/AllocatorUtil.java | 39 ---- .../arrow/memory/ArrowRoundingPolicy.java | 2 +- .../arrow/memory/BufferAllocatorUtil.java | 21 +- .../memory}/ChunkedAllocationManager.java | 26 ++- .../memory/ChunkedAllocationManagerTest.java | 220 ++++++++++++++++++ .../org/apache/fluss/server/kv/KvManager.java | 4 +- 10 files changed, 276 insertions(+), 77 deletions(-) delete mode 100644 fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/AllocatorUtil.java rename fluss-common/src/main/java/org/apache/fluss/{row => shaded/arrow/org/apache}/arrow/memory/ArrowRoundingPolicy.java (98%) rename fluss-common/src/main/java/org/apache/fluss/{row => shaded/arrow/org/apache}/arrow/memory/BufferAllocatorUtil.java (57%) rename fluss-common/src/main/java/org/apache/fluss/{compression => shaded/arrow/org/apache/arrow/memory}/ChunkedAllocationManager.java (95%) create mode 100644 fluss-common/src/test/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManagerTest.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java index d3fd60bf37..efb6f7a18d 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java @@ -18,7 +18,6 @@ package org.apache.fluss.client.table.scanner.batch; import org.apache.fluss.client.metadata.MetadataUpdater; -import org.apache.fluss.compression.ChunkedAllocationManager; import org.apache.fluss.exception.LeaderNotAvailableException; import org.apache.fluss.metadata.KvFormat; import org.apache.fluss.metadata.SchemaGetter; @@ -38,6 +37,7 @@ import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.messages.LimitScanRequest; import org.apache.fluss.rpc.messages.LimitScanResponse; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.CloseableIterator; import org.apache.fluss.utils.SchemaUtil; @@ -67,6 +67,8 @@ public class LimitBatchScanner implements BatchScanner { private final SchemaGetter schemaGetter; private final KvFormat kvFormat; private final int targetSchemaId; + /** The chunked allocation manager factory to reuse memory for arrow log write batch. */ + private final ChunkedAllocationManager.ChunkedFactory chunkedFactory; /** * A cache for schema projection mapping from source schema to target. Use HashMap here, because @@ -117,6 +119,7 @@ public LimitBatchScanner( this.kvFormat = tableInfo.getTableConfig().getKvFormat(); this.endOfInput = false; + this.chunkedFactory = new ChunkedAllocationManager.ChunkedFactory(); } @Nullable @@ -166,11 +169,7 @@ private List parseLimitScanResponse(LimitScanResponse limitScanResp } else { LogRecordReadContext readContext = LogRecordReadContext.createReadContext( - tableInfo, - false, - null, - schemaGetter, - new ChunkedAllocationManager.ChunkedFactory()); + tableInfo, false, null, schemaGetter, chunkedFactory); LogRecords records = MemoryLogRecords.pointToByteBuffer(recordsBuffer); for (LogRecordBatch logRecordBatch : records.batches()) { // A batch of log record maybe little more than limit, thus we need slice the @@ -209,5 +208,7 @@ private InternalRow maybeProject(InternalRow originRow) { @Override public void close() throws IOException { scanFuture.cancel(true); + // Release off-heap memory held by the chunked allocation manager factory. + chunkedFactory.close(); } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index a2615eedeb..2625716cde 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -23,7 +23,6 @@ import org.apache.fluss.client.metrics.ScannerMetricGroup; import org.apache.fluss.client.table.scanner.RemoteFileDownloader; import org.apache.fluss.cluster.BucketLocation; -import org.apache.fluss.compression.ChunkedAllocationManager; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.ApiException; @@ -54,6 +53,7 @@ import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.rpc.util.PredicateMessageUtils; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager; import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.fluss.utils.IOUtils; import org.apache.fluss.utils.Projection; @@ -95,6 +95,7 @@ public class LogFetcher implements Closeable { // currently can only do project when generate scanRecord instead of doing project while read // bytes from remote file. private final LogRecordReadContext remoteReadContext; + private final ChunkedAllocationManager.ChunkedFactory chunkedFactory; @Nullable private final Projection projection; @Nullable private final org.apache.fluss.rpc.messages.PbPredicate cachedPbPredicate; private final int filterSchemaId; @@ -129,8 +130,7 @@ public LogFetcher( SchemaGetter schemaGetter) { this.tablePath = tableInfo.getTablePath(); this.isPartitioned = tableInfo.isPartitioned(); - ChunkedAllocationManager.ChunkedFactory chunkedFactory = - new ChunkedAllocationManager.ChunkedFactory(); + this.chunkedFactory = new ChunkedAllocationManager.ChunkedFactory(); this.readContext = LogRecordReadContext.createReadContext( tableInfo, false, projection, schemaGetter, chunkedFactory); @@ -608,6 +608,7 @@ public synchronized void close() throws IOException { IOUtils.closeQuietly(remoteLogDownloader, "remoteLogDownloader"); readContext.close(); remoteReadContext.close(); + chunkedFactory.close(); isClosed = true; LOG.info("Fetcher for {} is closed.", tablePath); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 30f2b9b7c4..9e08e1a95d 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -22,7 +22,6 @@ import org.apache.fluss.client.metrics.WriterMetricGroup; import org.apache.fluss.cluster.BucketLocation; import org.apache.fluss.cluster.Cluster; -import org.apache.fluss.compression.ChunkedAllocationManager; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.FlussRuntimeException; @@ -36,8 +35,8 @@ import org.apache.fluss.record.LogRecordBatchStatisticsCollector; import org.apache.fluss.row.arrow.ArrowWriter; import org.apache.fluss.row.arrow.ArrowWriterPool; -import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager; import org.apache.fluss.utils.CopyOnWriteMap; import org.apache.fluss.utils.MathUtils; import org.apache.fluss.utils.clock.Clock; @@ -64,7 +63,7 @@ import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; -import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocatorUtil.createBufferAllocator; +import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocatorUtil.createBufferAllocator; import static org.apache.fluss.utils.Preconditions.checkNotNull; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache @@ -101,6 +100,9 @@ public final class RecordAccumulator { /** The arrow buffer allocator to allocate memory for arrow log write batch. */ private final BufferAllocator bufferAllocator; + /** The chunked allocation manager factory, stored for explicit native memory release. */ + private final ChunkedAllocationManager.ChunkedFactory chunkedFactory; + /** The pool of lazily created arrow {@link ArrowWriter}s for arrow log write batch. */ private final ArrowWriterPool arrowWriterPool; @@ -136,7 +138,8 @@ public final class RecordAccumulator { Math.max(1, (int) conf.get(ConfigOptions.CLIENT_WRITER_BATCH_SIZE).getBytes()); this.writerBufferPool = LazyMemorySegmentPool.createWriterBufferPool(conf); - this.bufferAllocator = createBufferAllocator(new ChunkedAllocationManager.ChunkedFactory()); + this.chunkedFactory = new ChunkedAllocationManager.ChunkedFactory(); + this.bufferAllocator = createBufferAllocator(chunkedFactory); this.arrowWriterPool = new ArrowWriterPool(bufferAllocator); this.incomplete = new IncompleteBatches(); this.nodesDrainIndex = new HashMap<>(); @@ -966,6 +969,8 @@ public void close() { // Release all the memory segments. bufferAllocator.releaseBytes(bufferAllocator.getAllocatedMemory()); bufferAllocator.close(); + // Release native memory held by the chunked allocation manager factory. + chunkedFactory.close(); } /** Per table bucket and write batches. */ diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 7ba0bb5d06..b85547fc10 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -18,7 +18,6 @@ package org.apache.fluss.record; import org.apache.fluss.annotation.VisibleForTesting; -import org.apache.fluss.compression.ChunkedAllocationManager; import org.apache.fluss.metadata.LogFormat; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.SchemaGetter; @@ -27,9 +26,9 @@ import org.apache.fluss.row.InternalRow.FieldGetter; import org.apache.fluss.row.ProjectedRow; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationManager; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocatorUtil; -import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocatorUtil; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; @@ -148,7 +147,8 @@ private static LogRecordReadContext createArrowReadContext( SchemaGetter schemaGetter, AllocationManager.Factory allocationManagerFactory) { // TODO: use a more reasonable memory limit - BufferAllocator allocator = BufferAllocatorUtil.createBufferAllocator(allocationManagerFactory); + BufferAllocator allocator = + BufferAllocatorUtil.createBufferAllocator(allocationManagerFactory); FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields); return new LogRecordReadContext( LogFormat.ARROW, diff --git a/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/AllocatorUtil.java b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/AllocatorUtil.java deleted file mode 100644 index ac4eb97a92..0000000000 --- a/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/AllocatorUtil.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.shaded.arrow.org.apache.arrow.memory; - -import org.apache.fluss.record.FlussRoundingPolicy; - -import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.configBuilder; - -/** Util to create UNSAFE_ALLOCATOR. */ -public class AllocatorUtil { - - private AllocatorUtil() {} - - public static BufferAllocator createBufferAllocator( - AllocationManager.Factory allocationManagerFactory) { - return new RootAllocator( - configBuilder() - .listener(AllocationListener.NOOP) - .maxAllocation(Long.MAX_VALUE) - .roundingPolicy(FlussRoundingPolicy.DEFAULT_ROUNDING_POLICY) - .allocationManagerFactory(allocationManagerFactory) - .build()); - } -} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/ArrowRoundingPolicy.java b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ArrowRoundingPolicy.java similarity index 98% rename from fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/ArrowRoundingPolicy.java rename to fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ArrowRoundingPolicy.java index bfd65fd220..9e7926b85d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/ArrowRoundingPolicy.java +++ b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ArrowRoundingPolicy.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.fluss.row.arrow.memory; +package org.apache.fluss.shaded.arrow.org.apache.arrow.memory; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.rounding.RoundingPolicy; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.util.CommonUtil; diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/BufferAllocatorUtil.java b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/BufferAllocatorUtil.java similarity index 57% rename from fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/BufferAllocatorUtil.java rename to fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/BufferAllocatorUtil.java index 3b0e2914b3..03c408de0c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/BufferAllocatorUtil.java +++ b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/BufferAllocatorUtil.java @@ -17,19 +17,26 @@ * under the License. */ -package org.apache.fluss.row.arrow.memory; +package org.apache.fluss.shaded.arrow.org.apache.arrow.memory; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationListener; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; +import javax.annotation.Nullable; -import static org.apache.fluss.row.arrow.memory.ArrowRoundingPolicy.ARROW_ROUNDING_POLICY; +import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowRoundingPolicy.ARROW_ROUNDING_POLICY; /** Utility class for creating Arrow BufferAllocators with the custom ArrowRoundingPolicy. */ public class BufferAllocatorUtil { /** Creates a {@link BufferAllocator} configured with the {@link ArrowRoundingPolicy}. */ - public static BufferAllocator createBufferAllocator() { - return new RootAllocator(AllocationListener.NOOP, Long.MAX_VALUE, ARROW_ROUNDING_POLICY); + public static BufferAllocator createBufferAllocator( + @Nullable AllocationManager.Factory allocationManagerFactory) { + ImmutableConfig.Builder builder = + ImmutableConfig.builder() + .listener(AllocationListener.NOOP) + .maxAllocation(Long.MAX_VALUE) + .roundingPolicy(ARROW_ROUNDING_POLICY); + if (allocationManagerFactory != null) { + builder.allocationManagerFactory(allocationManagerFactory); + } + return new RootAllocator(builder.build()); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/compression/ChunkedAllocationManager.java b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManager.java similarity index 95% rename from fluss-common/src/main/java/org/apache/fluss/compression/ChunkedAllocationManager.java rename to fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManager.java index 6c23950370..e326c638b8 100644 --- a/fluss-common/src/main/java/org/apache/fluss/compression/ChunkedAllocationManager.java +++ b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManager.java @@ -15,12 +15,9 @@ * limitations under the License. */ -package org.apache.fluss.compression; +package org.apache.fluss.shaded.arrow.org.apache.arrow.memory; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationManager; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowBuf; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ReferenceManager; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.util.MemoryUtil; import java.util.ArrayDeque; @@ -109,7 +106,7 @@ * } * *

For allocations >= chunkSize, a dedicated memory region is allocated directly (no bump - * pointer), behaving identically to {@link UnsafeAllocationManager}. + * pointer), behaving identically to {@code UnsafeAllocationManager}. */ public class ChunkedAllocationManager extends AllocationManager { @@ -257,9 +254,6 @@ void destroy() { */ public static class ChunkedFactory implements AllocationManager.Factory { - private static final ArrowBuf EMPTY = - new ArrowBuf(ReferenceManager.NO_OP, null, 0, MemoryUtil.UNSAFE.allocateMemory(0)); - private final long chunkSize; private final int maxFreeChunks; @@ -288,7 +282,7 @@ public ChunkedFactory(long chunkSize, int maxFreeChunks) { @Override public synchronized AllocationManager create( BufferAllocator accountingAllocator, long size) { - if (size >= chunkSize) { + if (size > chunkSize) { // Large allocation: give it its own memory region. long address = MemoryUtil.UNSAFE.allocateMemory(size); return new ChunkedAllocationManager(accountingAllocator, address, size); @@ -308,7 +302,7 @@ public synchronized AllocationManager create( @Override public ArrowBuf empty() { - return EMPTY; + return NettyAllocationManager.EMPTY_BUFFER; } /** @@ -364,5 +358,15 @@ public synchronized void close() { } activeChunk = null; } + + @VisibleForTesting + Deque getFreeChunks() { + return freeChunks; + } + + @VisibleForTesting + Chunk getActiveChunk() { + return activeChunk; + } } } diff --git a/fluss-common/src/test/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManagerTest.java b/fluss-common/src/test/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManagerTest.java new file mode 100644 index 0000000000..4662ac7aed --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManagerTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.shaded.arrow.org.apache.arrow.memory; + +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager.ChunkedFactory; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ChunkedAllocationManager}. */ +class ChunkedAllocationManagerTest { + + /** Use a small chunk size (1KB) for testing to make chunk transitions easy to trigger. */ + private static final long TEST_CHUNK_SIZE = 1024; + + private static final int TEST_MAX_FREE_CHUNKS = 2; + + private ChunkedFactory factory; + private BufferAllocator allocator; + + @BeforeEach + void setUp() { + factory = new ChunkedFactory(TEST_CHUNK_SIZE, TEST_MAX_FREE_CHUNKS); + allocator = BufferAllocatorUtil.createBufferAllocator(factory); + } + + @AfterEach + void tearDown() { + allocator.close(); + factory.close(); + } + + @Test + void testSmallAllocationPackedIntoSameChunk() { + // Multiple small allocations should land in the same chunk (contiguous addresses). + ArrowBuf buf1 = allocator.buffer(64); + ArrowBuf buf2 = allocator.buffer(128); + ArrowBuf buf3 = allocator.buffer(32); + // The gap between buf1 and buf2 should be exactly the aligned size of buf1 (64 bytes, + // already 8-byte aligned). + assertThat(buf2.memoryAddress() - buf1.memoryAddress()).isEqualTo(64); + // buf2 is 128 bytes, 8-byte aligned → gap of 128. + assertThat(buf3.memoryAddress() - buf2.memoryAddress()).isEqualTo(128); + + buf1.close(); + buf2.close(); + buf3.close(); + } + + @Test + void testAlignmentBytes() { + // Allocate a non-8-byte-aligned size; the next allocation should still be 8-byte aligned. + ArrowBuf buf1 = allocator.buffer(7); // 7 bytes → aligned to 8 + ArrowBuf buf2 = allocator.buffer(1); + + assertThat(buf2.memoryAddress() - buf1.memoryAddress()).isEqualTo(8); + buf1.close(); + buf2.close(); + } + + @Test + void testAllocateNewChunkWhenFull() { + // Fill the first chunk (1KB) and verify a new chunk is allocated. + // Each allocation is 256 bytes → 4 allocations fill the chunk. + List bufs = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + bufs.add(allocator.buffer(256)); + } + + // The first 4 buffers should be contiguous within one chunk. + long chunkBaseAddress = bufs.get(0).memoryAddress(); + for (int i = 1; i < 4; i++) { + assertThat(bufs.get(i).memoryAddress()).isEqualTo(chunkBaseAddress + (long) i * 256); + } + + assertThat(factory.getActiveChunk().used).isEqualTo(TEST_CHUNK_SIZE); + // The 5th allocation should land in a new chunk. + ArrowBuf overflow = allocator.buffer(256); + // The active chunk should now be a fresh chunk with only the overflow allocation, + // proving a new chunk was created (rather than checking memory address which is + // non-deterministic — the OS may allocate the new chunk right after the first one). + assertThat(factory.getActiveChunk().used).isEqualTo(256); + + overflow.close(); + bufs.forEach(ArrowBuf::close); + } + + @Test + void testChunkRecycledAfterAllBuffersReleased() { + // Allocate a buffer, release it (drains the chunk), then allocate again — should reuse. + ArrowBuf buf1 = allocator.buffer(64); + long firstAddress = buf1.memoryAddress(); + buf1.close(); + + // The chunk was drained (subAllocCount=0) and should be recycled (reset bump pointer). + // Next allocation should reuse the same chunk starting from offset 0. + ArrowBuf buf2 = allocator.buffer(64); + assertThat(buf2.memoryAddress()).isEqualTo(firstAddress); + + buf2.close(); + } + + @Test + void testBatchAllocateAndReleaseCycle() { + // Simulate the batch-friendly lifecycle: allocate many, release all, allocate again. + int bufferCount = 10; + List batch1 = new ArrayList<>(); + for (int i = 0; i < bufferCount; i++) { + batch1.add(allocator.buffer(64)); + } + long batch1BaseAddress = batch1.get(0).memoryAddress(); + + // Release all — chunk should be recycled. + batch1.forEach(ArrowBuf::close); + + // Second batch should reuse the same chunk. + List batch2 = new ArrayList<>(); + for (int i = 0; i < bufferCount; i++) { + batch2.add(allocator.buffer(64)); + } + assertThat(batch2.get(0).memoryAddress()).isEqualTo(batch1BaseAddress); + + batch2.forEach(ArrowBuf::close); + } + + @Test + void testFreeListCapRespected() { + // Create multiple chunks, release them all — only maxFreeChunks should be cached. + // chunkSize=1024, allocate 512 per chunk to create distinct chunks. + List chunk1Bufs = new ArrayList<>(); + // Fill chunk 1 (1024 bytes) + chunk1Bufs.add(allocator.buffer(512)); + chunk1Bufs.add(allocator.buffer(512)); + + // Fill chunk 2 (triggers new chunk) + List chunk2Bufs = new ArrayList<>(); + chunk2Bufs.add(allocator.buffer(512)); + chunk2Bufs.add(allocator.buffer(512)); + + // Fill chunk 3 (triggers new chunk) + List chunk3Bufs = new ArrayList<>(); + chunk3Bufs.add(allocator.buffer(512)); + chunk3Bufs.add(allocator.buffer(512)); + + // Fill chunk 4 (triggers new chunk) + List chunk4Bufs = new ArrayList<>(); + chunk4Bufs.add(allocator.buffer(512)); + chunk4Bufs.add(allocator.buffer(512)); + + // Release all chunks — free-list can hold only 2 (maxFreeChunks=2). + // Active chunk gets reset, inactive ones go to free-list or are destroyed. + chunk1Bufs.forEach(ArrowBuf::close); + chunk2Bufs.forEach(ArrowBuf::close); + chunk3Bufs.forEach(ArrowBuf::close); + chunk4Bufs.forEach(ArrowBuf::close); + + assertThat(factory.getFreeChunks()).hasSize(TEST_MAX_FREE_CHUNKS); + assertThat(factory.getActiveChunk()).isNotNull(); + assertThat(factory.getActiveChunk().used).isEqualTo(0L); + } + + @Test + void testEmptyBufferReturnedFromFactory() { + ArrowBuf empty = factory.empty(); + assertThat(empty).isNotNull(); + assertThat(empty.capacity()).isEqualTo(0); + } + + @Test + void testMultipleSmallBatches() { + // Verify stable memory across multiple batch cycles(smaller than batch size). + for (int batch = 0; batch < 5; batch++) { + List bufs = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + bufs.add(allocator.buffer(16)); + } + bufs.forEach(ArrowBuf::close); + } + + assertThat(factory.getFreeChunks()).hasSize(0); + assertThat(factory.getActiveChunk()).isNotNull(); + assertThat(factory.getActiveChunk().used).isEqualTo(0L); + } + + @Test + void testFactoryCloseReleasesResources() { + ChunkedFactory localFactory = new ChunkedFactory(TEST_CHUNK_SIZE, TEST_MAX_FREE_CHUNKS); + BufferAllocator localAllocator = BufferAllocatorUtil.createBufferAllocator(localFactory); + + ArrowBuf buf = localAllocator.buffer(64); + buf.close(); + + // close the allocator. + localAllocator.close(); + localFactory.close(); + assertThat(localFactory.getFreeChunks()).isEmpty(); + assertThat(localFactory.getActiveChunk()).isNull(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index 6a758b2f4c..c4701170b1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -35,7 +35,6 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.server.TabletManagerBase; import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; import org.apache.fluss.server.kv.autoinc.ZkSequenceGeneratorFactory; @@ -45,6 +44,7 @@ import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.MapUtils; @@ -150,7 +150,7 @@ private KvManager( throws IOException { super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir); this.logManager = logManager; - this.arrowBufferAllocator = BufferAllocatorUtil.createBufferAllocator(); + this.arrowBufferAllocator = BufferAllocatorUtil.createBufferAllocator(null); this.memorySegmentPool = LazyMemorySegmentPool.createServerBufferPool(conf); this.zkClient = zkClient; this.remoteKvDir = FlussPaths.remoteKvDir(conf); From 3afdfcc294a502e30d7b0721737b999ee2cf00bd Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Tue, 14 Apr 2026 15:19:50 +0800 Subject: [PATCH 3/4] fix test --- fluss-test-coverage/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 78ff8b15ee..77133ce997 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -341,7 +341,7 @@ org.apache.fluss.memory.* org.apache.fluss.utils.* org.apache.fluss.exception.* - org.apache.fluss.row.arrow.* + org.apache.fluss.shaded.arrow.org.apache.arrow.memory.* org.apache.fluss.row.columnar.BytesColumnVector.Bytes From 9f2dd02790c46ff235c85514363f7a318b62973f Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Tue, 14 Apr 2026 21:10:49 +0800 Subject: [PATCH 4/4] modified based on anton's advice --- .../memory/ChunkedAllocationManager.java | 76 +++++++++++++++--- .../memory/ChunkedAllocationManagerTest.java | 78 +++++++++++++++++++ fluss-test-coverage/pom.xml | 1 + 3 files changed, 143 insertions(+), 12 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManager.java b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManager.java index e326c638b8..14e29a2c69 100644 --- a/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManager.java +++ b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManager.java @@ -188,6 +188,13 @@ static class Chunk { * instances are released. */ final AtomicInteger subAllocCount = new AtomicInteger(0); + /** + * Generation counter incremented each time this chunk is successfully drained. Used to + * prevent ABA double-recycle: a caller snapshots the generation before the atomic + * decrement; if the generation has changed by the time {@link + * ChunkedFactory#onChunkDrained} runs, the drain is stale and must be skipped. + */ + volatile int drainGeneration; /** Back-reference to the owning factory for recycling on drain. */ final ChunkedFactory factory; @@ -219,10 +226,37 @@ long bumpAllocate(long alignedSize) { /** * Called when a sub-allocation's ArrowBuf is released. If the chunk is now empty (all * sub-allocations freed), notifies the factory for recycling. + * + *

Why not always call {@code onChunkDrained}?

+ * + *

Unconditionally calling {@link ChunkedFactory#onChunkDrained} on every release would + * be thread-safe (the method is {@code synchronized}), but prohibitively expensive: in a + * high-column scenario (e.g., 1000 columns x 3 buffers = 3000 releases per batch), every + * release would contend for the factory lock even though only the very last one actually + * needs to recycle the chunk. + * + *

To avoid this, we first perform a lock-free check: {@code + * subAllocCount.decrementAndGet() == 0}, and only enter the synchronized {@code + * onChunkDrained} when the chunk is truly empty. This reduces lock contention from O(N) to + * O(1) per drain cycle. + * + *

ABA double-recycle problem

+ * + *

However, the gap between {@code subAllocCount.decrementAndGet() == 0} and the + * subsequent {@code onChunkDrained} call is not atomic. During this window another + * thread can complete a full allocate-release cycle on the same chunk, triggering a second + * legitimate drain. Without protection, both callers would enter {@code onChunkDrained} and + * recycle the same chunk twice (duplicate free-list entry or use-after-free). + * + *

The {@link Chunk#drainGeneration} snapshot solves this: we capture the generation + * before the atomic decrement. Inside {@code onChunkDrained}, the generation is + * compared and incremented atomically under the lock, so only the first caller succeeds; + * the stale caller sees a mismatch and is skipped. */ void releaseSubAllocation() { + int gen = drainGeneration; if (subAllocCount.decrementAndGet() == 0) { - factory.onChunkDrained(this); + factory.onChunkDrained(this, gen); } } @@ -263,6 +297,9 @@ public static class ChunkedFactory implements AllocationManager.Factory { /** Pool of empty chunks available for reuse. */ private final Deque freeChunks = new ArrayDeque<>(); + /** Set to true when {@link #close()} is called. */ + private boolean closed; + /** Creates a factory with default chunk size (4MB) and max 3 free chunks. */ public ChunkedFactory() { this(DEFAULT_CHUNK_SIZE, DEFAULT_MAX_FREE_CHUNKS); @@ -309,7 +346,7 @@ public ArrowBuf empty() { * Obtains a chunk for use as the new active chunk. Prefers recycled chunks from the * free-list; falls back to allocating a fresh one. */ - private Chunk obtainChunk() { + private synchronized Chunk obtainChunk() { Chunk recycled = freeChunks.pollFirst(); if (recycled != null) { recycled.resetBump(); @@ -322,18 +359,30 @@ private Chunk obtainChunk() { * Called when a chunk's sub-allocation count reaches zero (all ArrowBufs released). * Recycles the chunk if possible, or frees its memory if the free-list is full. * - *

Thread-safety note: between {@code subAllocCount} reaching 0 and this method acquiring - * the lock, another thread may call {@link #create} and bump-allocate from this chunk (if - * it is still the active chunk). The guard {@code chunk.subAllocCount.get() > 0} handles - * this race. + *

Thread-safety: between {@code subAllocCount} reaching 0 and this method acquiring the + * lock, another thread may complete a full allocate-release cycle on the same chunk (ABA + * problem). The {@code expectedGeneration} parameter detects this: each successful drain + * increments the generation, so a stale caller's snapshot will mismatch. + * + * @param chunk the chunk whose sub-allocation count just reached zero. + * @param expectedGeneration the drain generation snapshot taken before the atomic + * decrement. If it no longer matches, another drain already processed this event. */ - synchronized void onChunkDrained(Chunk chunk) { - // Guard: if new sub-allocations were added between refCount=0 and now, skip. + synchronized void onChunkDrained(Chunk chunk, int expectedGeneration) { + // ABA guard: if another thread already drained and recycled this chunk, skip. + if (chunk.drainGeneration != expectedGeneration) { + return; + } + // Simple re-allocation guard: new sub-allocations arrived after our decrement. if (chunk.subAllocCount.get() > 0) { return; } + chunk.drainGeneration++; - if (chunk == activeChunk) { + if (closed) { + // Factory is closed — no one will ever reuse this chunk. Free it. + chunk.destroy(); + } else if (chunk == activeChunk) { // Still the active chunk — just reset bump pointer for continued use. chunk.resetBump(); } else if (freeChunks.size() < maxFreeChunks) { @@ -346,12 +395,15 @@ synchronized void onChunkDrained(Chunk chunk) { } /** - * Closes this factory, freeing all cached chunks. Active chunks with outstanding - * sub-allocations will be freed when their last ArrowBuf is released. + * Closes this factory, freeing all cached and idle chunks. Active chunks with outstanding + * sub-allocations will be freed when their last ArrowBuf is released (see {@link + * #onChunkDrained}). */ public synchronized void close() { + closed = true; while (!freeChunks.isEmpty()) { - freeChunks.poll().destroy(); + Chunk poll = freeChunks.poll(); + poll.destroy(); } if (activeChunk != null && activeChunk.subAllocCount.get() == 0) { activeChunk.destroy(); diff --git a/fluss-common/src/test/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManagerTest.java b/fluss-common/src/test/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManagerTest.java index 4662ac7aed..e6c1bc01f9 100644 --- a/fluss-common/src/test/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManagerTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManagerTest.java @@ -25,6 +25,8 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; @@ -217,4 +219,80 @@ void testFactoryCloseReleasesResources() { assertThat(localFactory.getFreeChunks()).isEmpty(); assertThat(localFactory.getActiveChunk()).isNull(); } + + @Test + void testConcurrentAllocateAndRelease() throws Exception { + // Stress test for the double-recycle race: multiple threads sharing one factory, + // each doing allocate -> close in a tight loop. Before the drainGeneration fix, + // onChunkDrained could be called twice for the same drain event, causing double-recycle + // (duplicate entries in freeChunks) or use-after-free (native memory corruption). + int threadCount = 8; + int iterationsPerThread = 500; + ChunkedFactory concurrentFactory = + new ChunkedFactory(TEST_CHUNK_SIZE, TEST_MAX_FREE_CHUNKS); + BufferAllocator concurrentAllocator = + BufferAllocatorUtil.createBufferAllocator(concurrentFactory); + + CyclicBarrier barrier = new CyclicBarrier(threadCount); + AtomicReference failure = new AtomicReference<>(); + + Thread[] threads = new Thread[threadCount]; + for (int t = 0; t < threadCount; t++) { + threads[t] = + new Thread( + () -> { + try { + barrier.await(); + for (int i = 0; i < iterationsPerThread; i++) { + ArrowBuf buf = concurrentAllocator.buffer(64); + buf.close(); + } + } catch (Throwable ex) { + failure.compareAndSet(null, ex); + } + }); + threads[t].start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + assertThat(failure.get()).isNull(); + + // After all threads finish, the free-list must not exceed maxFreeChunks + // (would indicate double-recycle). + assertThat(concurrentFactory.getFreeChunks().size()) + .isLessThanOrEqualTo(TEST_MAX_FREE_CHUNKS); + + concurrentAllocator.close(); + concurrentFactory.close(); + } + + @Test + void testCloseWhileArrowBufsAliveDoesNotLeakMemory() { + // Verify that closing the factory while ArrowBufs are still alive does not leak memory. + ChunkedFactory localFactory = new ChunkedFactory(TEST_CHUNK_SIZE, TEST_MAX_FREE_CHUNKS); + BufferAllocator localAllocator = BufferAllocatorUtil.createBufferAllocator(localFactory); + + // Allocate some buffers so the active chunk has outstanding sub-allocations. + List bufs = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + bufs.add(localAllocator.buffer(64)); + } + + // Close factory while ArrowBufs are still alive. + + localFactory.close(); + + // Active chunk was not destroyed (subAllocCount > 0), just nulled. + assertThat(localFactory.getActiveChunk()).isNull(); + + // Release all ArrowBufs — onChunkDrained should destroy the chunk, not recycle it. + bufs.forEach(ArrowBuf::close); + + // The chunk must NOT have been added to freeChunks (would be a leak). + assertThat(localFactory.getFreeChunks()).isEmpty(); + localAllocator.close(); + } } diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index 77133ce997..9746076c42 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -341,6 +341,7 @@ org.apache.fluss.memory.* org.apache.fluss.utils.* org.apache.fluss.exception.* + org.apache.fluss.row.arrow.* org.apache.fluss.shaded.arrow.org.apache.arrow.memory.* org.apache.fluss.row.columnar.BytesColumnVector.Bytes