diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java index f4a3be53f6..efb6f7a18d 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java @@ -37,6 +37,7 @@ import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.messages.LimitScanRequest; import org.apache.fluss.rpc.messages.LimitScanResponse; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager; import org.apache.fluss.types.RowType; import org.apache.fluss.utils.CloseableIterator; import org.apache.fluss.utils.SchemaUtil; @@ -66,6 +67,8 @@ public class LimitBatchScanner implements BatchScanner { private final SchemaGetter schemaGetter; private final KvFormat kvFormat; private final int targetSchemaId; + /** The chunked allocation manager factory to reuse memory for arrow log write batch. */ + private final ChunkedAllocationManager.ChunkedFactory chunkedFactory; /** * A cache for schema projection mapping from source schema to target. Use HashMap here, because @@ -116,6 +119,7 @@ public LimitBatchScanner( this.kvFormat = tableInfo.getTableConfig().getKvFormat(); this.endOfInput = false; + this.chunkedFactory = new ChunkedAllocationManager.ChunkedFactory(); } @Nullable @@ -164,7 +168,8 @@ private List parseLimitScanResponse(LimitScanResponse limitScanResp } } else { LogRecordReadContext readContext = - LogRecordReadContext.createReadContext(tableInfo, false, null, schemaGetter); + LogRecordReadContext.createReadContext( + tableInfo, false, null, schemaGetter, chunkedFactory); LogRecords records = MemoryLogRecords.pointToByteBuffer(recordsBuffer); for (LogRecordBatch logRecordBatch : records.batches()) { // A batch of log record maybe little more than limit, thus we need slice the @@ -203,5 +208,7 @@ private InternalRow maybeProject(InternalRow originRow) { @Override public void close() throws IOException { scanFuture.cancel(true); + // Release off-heap memory held by the chunked allocation manager factory. + chunkedFactory.close(); } } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java index 505b82c127..2625716cde 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java @@ -53,6 +53,7 @@ import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.rpc.util.PredicateMessageUtils; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager; import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf; import org.apache.fluss.utils.IOUtils; import org.apache.fluss.utils.Projection; @@ -94,6 +95,7 @@ public class LogFetcher implements Closeable { // currently can only do project when generate scanRecord instead of doing project while read // bytes from remote file. private final LogRecordReadContext remoteReadContext; + private final ChunkedAllocationManager.ChunkedFactory chunkedFactory; @Nullable private final Projection projection; @Nullable private final org.apache.fluss.rpc.messages.PbPredicate cachedPbPredicate; private final int filterSchemaId; @@ -128,10 +130,13 @@ public LogFetcher( SchemaGetter schemaGetter) { this.tablePath = tableInfo.getTablePath(); this.isPartitioned = tableInfo.isPartitioned(); + this.chunkedFactory = new ChunkedAllocationManager.ChunkedFactory(); this.readContext = - LogRecordReadContext.createReadContext(tableInfo, false, projection, schemaGetter); + LogRecordReadContext.createReadContext( + tableInfo, false, projection, schemaGetter, chunkedFactory); this.remoteReadContext = - LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter); + LogRecordReadContext.createReadContext( + tableInfo, true, projection, schemaGetter, chunkedFactory); this.projection = projection; this.cachedPbPredicate = recordBatchFilter != null @@ -603,6 +608,7 @@ public synchronized void close() throws IOException { IOUtils.closeQuietly(remoteLogDownloader, "remoteLogDownloader"); readContext.close(); remoteReadContext.close(); + chunkedFactory.close(); isClosed = true; LOG.info("Fetcher for {} is closed.", tablePath); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 5f35b1e1cb..9e08e1a95d 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -35,8 +35,8 @@ import org.apache.fluss.record.LogRecordBatchStatisticsCollector; import org.apache.fluss.row.arrow.ArrowWriter; import org.apache.fluss.row.arrow.ArrowWriterPool; -import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager; import org.apache.fluss.utils.CopyOnWriteMap; import org.apache.fluss.utils.MathUtils; import org.apache.fluss.utils.clock.Clock; @@ -63,6 +63,7 @@ import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE; import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID; +import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocatorUtil.createBufferAllocator; import static org.apache.fluss.utils.Preconditions.checkNotNull; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache @@ -99,6 +100,9 @@ public final class RecordAccumulator { /** The arrow buffer allocator to allocate memory for arrow log write batch. */ private final BufferAllocator bufferAllocator; + /** The chunked allocation manager factory, stored for explicit native memory release. */ + private final ChunkedAllocationManager.ChunkedFactory chunkedFactory; + /** The pool of lazily created arrow {@link ArrowWriter}s for arrow log write batch. */ private final ArrowWriterPool arrowWriterPool; @@ -134,7 +138,8 @@ public final class RecordAccumulator { Math.max(1, (int) conf.get(ConfigOptions.CLIENT_WRITER_BATCH_SIZE).getBytes()); this.writerBufferPool = LazyMemorySegmentPool.createWriterBufferPool(conf); - this.bufferAllocator = BufferAllocatorUtil.createBufferAllocator(); + this.chunkedFactory = new ChunkedAllocationManager.ChunkedFactory(); + this.bufferAllocator = createBufferAllocator(chunkedFactory); this.arrowWriterPool = new ArrowWriterPool(bufferAllocator); this.incomplete = new IncompleteBatches(); this.nodesDrainIndex = new HashMap<>(); @@ -964,6 +969,8 @@ public void close() { // Release all the memory segments. bufferAllocator.releaseBytes(bufferAllocator.getAllocatedMemory()); bufferAllocator.close(); + // Release native memory held by the chunked allocation manager factory. + chunkedFactory.close(); } /** Per table bucket and write batches. */ diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 29175aabdc..b85547fc10 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -25,8 +25,10 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.InternalRow.FieldGetter; import org.apache.fluss.row.ProjectedRow; -import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationManager; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocatorUtil; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; @@ -68,6 +70,29 @@ public static LogRecordReadContext createReadContext( boolean readFromRemote, @Nullable Projection projection, SchemaGetter schemaGetter) { + return createReadContext( + tableInfo, + readFromRemote, + projection, + schemaGetter, + new ChunkedAllocationManager.ChunkedFactory()); + } + + /** + * Creates a {@link LogRecordReadContext} with a custom {@link AllocationManager.Factory}. + * + * @param tableInfo the table info of the table to read + * @param readFromRemote whether the data is read from remote storage + * @param projection the projection to apply, or null for all fields + * @param schemaGetter the schema getter to resolve schema by id + * @param allocationManagerFactory the factory for creating Arrow memory allocations + */ + public static LogRecordReadContext createReadContext( + TableInfo tableInfo, + boolean readFromRemote, + @Nullable Projection projection, + SchemaGetter schemaGetter, + AllocationManager.Factory allocationManagerFactory) { RowType rowType = tableInfo.getRowType(); LogFormat logFormat = tableInfo.getTableConfig().getLogFormat(); // only for arrow log format, the projection can be push downed to the server side @@ -84,7 +109,12 @@ public static LogRecordReadContext createReadContext( // so set the rowType as is. int[] selectedFields = projection.getProjection(); return createArrowReadContext( - rowType, schemaId, selectedFields, false, schemaGetter); + rowType, + schemaId, + selectedFields, + false, + schemaGetter, + allocationManagerFactory); } else { // arrow data that returned from server has been projected (in order) RowType projectedRowType = projection.projectInOrder(rowType); @@ -95,7 +125,8 @@ public static LogRecordReadContext createReadContext( schemaId, selectedFields, projectionPushDowned, - schemaGetter); + schemaGetter, + allocationManagerFactory); } } else if (logFormat == LogFormat.INDEXED) { int[] selectedFields = projection.getProjection(); @@ -113,9 +144,11 @@ private static LogRecordReadContext createArrowReadContext( int schemaId, int[] selectedFields, boolean projectionPushDowned, - SchemaGetter schemaGetter) { + SchemaGetter schemaGetter, + AllocationManager.Factory allocationManagerFactory) { // TODO: use a more reasonable memory limit - BufferAllocator allocator = BufferAllocatorUtil.createBufferAllocator(); + BufferAllocator allocator = + BufferAllocatorUtil.createBufferAllocator(allocationManagerFactory); FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields); return new LogRecordReadContext( LogFormat.ARROW, @@ -139,7 +172,13 @@ private static LogRecordReadContext createArrowReadContext( public static LogRecordReadContext createArrowReadContext( RowType rowType, int schemaId, SchemaGetter schemaGetter) { int[] selectedFields = IntStream.range(0, rowType.getFieldCount()).toArray(); - return createArrowReadContext(rowType, schemaId, selectedFields, false, schemaGetter); + return createArrowReadContext( + rowType, + schemaId, + selectedFields, + false, + schemaGetter, + new ChunkedAllocationManager.ChunkedFactory()); } @VisibleForTesting @@ -150,7 +189,12 @@ public static LogRecordReadContext createArrowReadContext( boolean projectionPushDowned) { int[] selectedFields = IntStream.range(0, rowType.getFieldCount()).toArray(); return createArrowReadContext( - rowType, schemaId, selectedFields, projectionPushDowned, schemaGetter); + rowType, + schemaId, + selectedFields, + projectionPushDowned, + schemaGetter, + new ChunkedAllocationManager.ChunkedFactory()); } /** diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/ArrowRoundingPolicy.java b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ArrowRoundingPolicy.java similarity index 98% rename from fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/ArrowRoundingPolicy.java rename to fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ArrowRoundingPolicy.java index bfd65fd220..9e7926b85d 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/ArrowRoundingPolicy.java +++ b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ArrowRoundingPolicy.java @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.fluss.row.arrow.memory; +package org.apache.fluss.shaded.arrow.org.apache.arrow.memory; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.rounding.RoundingPolicy; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.util.CommonUtil; diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/BufferAllocatorUtil.java b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/BufferAllocatorUtil.java similarity index 57% rename from fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/BufferAllocatorUtil.java rename to fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/BufferAllocatorUtil.java index 3b0e2914b3..03c408de0c 100644 --- a/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/BufferAllocatorUtil.java +++ b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/BufferAllocatorUtil.java @@ -17,19 +17,26 @@ * under the License. */ -package org.apache.fluss.row.arrow.memory; +package org.apache.fluss.shaded.arrow.org.apache.arrow.memory; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationListener; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; +import javax.annotation.Nullable; -import static org.apache.fluss.row.arrow.memory.ArrowRoundingPolicy.ARROW_ROUNDING_POLICY; +import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowRoundingPolicy.ARROW_ROUNDING_POLICY; /** Utility class for creating Arrow BufferAllocators with the custom ArrowRoundingPolicy. */ public class BufferAllocatorUtil { /** Creates a {@link BufferAllocator} configured with the {@link ArrowRoundingPolicy}. */ - public static BufferAllocator createBufferAllocator() { - return new RootAllocator(AllocationListener.NOOP, Long.MAX_VALUE, ARROW_ROUNDING_POLICY); + public static BufferAllocator createBufferAllocator( + @Nullable AllocationManager.Factory allocationManagerFactory) { + ImmutableConfig.Builder builder = + ImmutableConfig.builder() + .listener(AllocationListener.NOOP) + .maxAllocation(Long.MAX_VALUE) + .roundingPolicy(ARROW_ROUNDING_POLICY); + if (allocationManagerFactory != null) { + builder.allocationManagerFactory(allocationManagerFactory); + } + return new RootAllocator(builder.build()); } } diff --git a/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManager.java b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManager.java new file mode 100644 index 0000000000..e326c638b8 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManager.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.shaded.arrow.org.apache.arrow.memory; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.util.MemoryUtil; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * An {@link AllocationManager} that packs small allocations into large pre-allocated chunks using a + * bump-pointer strategy, specifically designed for Arrow column decompression in high-column-count + * scenarios (e.g., 1000+ columns). + * + *

Why not Netty's PooledByteBufAllocator?

+ * + *

Arrow column decompression produces a burst of ~3000 small, variable-sized allocations (1000 + * columns x ~3 buffers each: validity, offset, data). Netty's allocator is optimized for a + * different pattern and causes problems here: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
AspectNetty PooledByteBufAllocatorChunkedAllocationManager
Small allocationRoutes through size-class subpages (tiny/small/normal). Different sizes land on + * different subpages within different pages of the same or different chunks, + * causing cross-chunk fragmentation.Sequential bump-pointer within the current chunk. All sizes packed contiguously into + * one chunk regardless of size class.
Chunk selectionTries q050 → q025 → q000 → qInit → q075, preferring + * moderately-used chunks. May spread allocations across multiple chunks.Always uses the current active chunk. Only switches when full.
Chunk reclamationA chunk is freed only when it is in {@code q000} and usage drops to 0%. But + * cross-size-class subpage pinning often prevents any single chunk from reaching 0%. + * The first chunk per arena (in {@code qInit}) is never freed.When all sub-allocations in a chunk are released ({@code subAllocCount} reaches 0), + * the chunk is immediately recycled to the free-list.
Thread-local caching{@code PoolThreadCache} holds released buffers in thread-local caches, delaying their + * return to the chunk. Cache is trimmed only every 8192 allocations or on thread exit, + * making chunk usage appear higher than actual.No thread-local caching. Release decrements {@code subAllocCount} immediately.
Typical result for 1000-column decompression3000 buffers spread across 2-5 chunks. After batch release, chunks remain partially + * used due to subpage pinning + thread cache. Memory grows over successive batches + * → OOM.3000 buffers land in 1-2 chunks. After batch release, all chunks immediately + * recyclable. Stable memory footprint across batches.
+ * + *

How it works

+ * + * + * + *

Batch-friendly lifecycle

+ * + *
+ *   Batch N:  allocator.buffer() x 3000  →  all land in chunk A (bump pointer)
+ *             process batch...
+ *             release all buffers          →  subAllocCount = 0 → chunk A recycled
+ *
+ *   Batch N+1: allocator.buffer() x 3000  →  reuse chunk A (zero malloc/free)
+ * 
+ * + *

Usage

+ * + *
{@code
+ * BufferAllocator allocator = new RootAllocator(
+ *         BaseAllocator.configBuilder()
+ *                 .allocationManagerFactory(new ChunkedAllocationManager.ChunkedFactory())
+ *                 .build());
+ * }
+ * + *

For allocations >= chunkSize, a dedicated memory region is allocated directly (no bump + * pointer), behaving identically to {@code UnsafeAllocationManager}. + */ +public class ChunkedAllocationManager extends AllocationManager { + + /** 8-byte alignment for all sub-allocations within a chunk. */ + private static final long ALIGNMENT = 8; + + /** Default chunk size: 4MB (matches Netty 4.1+ maxOrder=9). */ + private static final long DEFAULT_CHUNK_SIZE = 4L * 1024 * 1024; + + /** Default maximum number of empty chunks to keep in the free-list. */ + private static final int DEFAULT_MAX_FREE_CHUNKS = 3; + + private final long allocatedSize; + + // --- Fields for sub-allocation (small request carved from a shared chunk) --- + private final Chunk chunk; + private final long offsetInChunk; + + // --- Fields for direct allocation (large request, owns its own memory) --- + private final long directAddress; + + /** Sub-allocation carved from a shared {@link Chunk}. */ + private ChunkedAllocationManager( + BufferAllocator accountingAllocator, Chunk chunk, long offset, long size) { + super(accountingAllocator); + this.chunk = chunk; + this.offsetInChunk = offset; + this.allocatedSize = size; + this.directAddress = 0; + } + + /** Direct allocation for oversized requests (>= chunkSize). Owns its own memory region. */ + private ChunkedAllocationManager(BufferAllocator accountingAllocator, long address, long size) { + super(accountingAllocator); + this.chunk = null; + this.offsetInChunk = 0; + this.allocatedSize = size; + this.directAddress = address; + } + + @Override + public long getSize() { + return allocatedSize; + } + + @Override + protected long memoryAddress() { + if (chunk != null) { + return chunk.address + offsetInChunk; + } + return directAddress; + } + + @Override + protected void release0() { + if (chunk != null) { + chunk.releaseSubAllocation(); + } else { + MemoryUtil.UNSAFE.freeMemory(directAddress); + } + } + + // ------------------------------------------------------------------------- + // Chunk — a contiguous memory region with bump-pointer sub-allocation + // ------------------------------------------------------------------------- + + /** + * A contiguous native memory region that holds multiple small allocations via bump-pointer. + * Reference-counted: when all sub-allocations are released (count reaches 0), the chunk is + * recycled back to the factory's free-list. + */ + static class Chunk { + final long address; + final long capacity; + /** Bump pointer — only accessed under the factory's synchronized lock. */ + long used; + /** + * Number of active sub-allocations. Decremented from arbitrary threads when ArrowBuf + * instances are released. + */ + final AtomicInteger subAllocCount = new AtomicInteger(0); + /** Back-reference to the owning factory for recycling on drain. */ + final ChunkedFactory factory; + + Chunk(long capacity, ChunkedFactory factory) { + this.address = MemoryUtil.UNSAFE.allocateMemory(capacity); + this.capacity = capacity; + this.used = 0; + this.factory = factory; + } + + /** Returns true if this chunk has room for an aligned allocation of the given size. */ + boolean hasRoom(long alignedSize) { + return used + alignedSize <= capacity; + } + + /** + * Bump-allocates {@code alignedSize} bytes from this chunk. Must be called under the + * factory lock. + * + * @return the offset within this chunk where the allocation starts. + */ + long bumpAllocate(long alignedSize) { + long offset = used; + used += alignedSize; + subAllocCount.incrementAndGet(); + return offset; + } + + /** + * Called when a sub-allocation's ArrowBuf is released. If the chunk is now empty (all + * sub-allocations freed), notifies the factory for recycling. + */ + void releaseSubAllocation() { + if (subAllocCount.decrementAndGet() == 0) { + factory.onChunkDrained(this); + } + } + + /** Resets the bump pointer so the chunk can be reused for new allocations. */ + void resetBump() { + used = 0; + // subAllocCount is already 0 at this point. + } + + /** Frees the underlying native memory. */ + void destroy() { + MemoryUtil.UNSAFE.freeMemory(address); + } + } + + // ------------------------------------------------------------------------- + // Factory + // ------------------------------------------------------------------------- + + /** + * Factory that creates {@link ChunkedAllocationManager} instances. + * + *

Small allocations (< chunkSize) are packed into the current active chunk via bump-pointer. + * When the active chunk is full, a recycled or freshly-allocated chunk is used. Large + * allocations (>= chunkSize) get their own dedicated memory region. + * + *

This factory is thread-safe: {@link #create} and {@link #onChunkDrained} are {@code + * synchronized}. + */ + public static class ChunkedFactory implements AllocationManager.Factory { + + private final long chunkSize; + private final int maxFreeChunks; + + /** The chunk currently receiving bump allocations. May be null initially. */ + private Chunk activeChunk; + + /** Pool of empty chunks available for reuse. */ + private final Deque freeChunks = new ArrayDeque<>(); + + /** Creates a factory with default chunk size (4MB) and max 3 free chunks. */ + public ChunkedFactory() { + this(DEFAULT_CHUNK_SIZE, DEFAULT_MAX_FREE_CHUNKS); + } + + /** + * Creates a factory with the given chunk size and free-chunk pool limit. + * + * @param chunkSize maximum size of each chunk (bytes). Allocations >= this go direct. + * @param maxFreeChunks maximum number of empty chunks to keep cached for reuse. + */ + public ChunkedFactory(long chunkSize, int maxFreeChunks) { + this.chunkSize = chunkSize; + this.maxFreeChunks = maxFreeChunks; + } + + @Override + public synchronized AllocationManager create( + BufferAllocator accountingAllocator, long size) { + if (size > chunkSize) { + // Large allocation: give it its own memory region. + long address = MemoryUtil.UNSAFE.allocateMemory(size); + return new ChunkedAllocationManager(accountingAllocator, address, size); + } + + // Align to 8 bytes for safe direct-memory access. + long alignedSize = (size + ALIGNMENT - 1) & ~(ALIGNMENT - 1); + + if (activeChunk == null || !activeChunk.hasRoom(alignedSize)) { + // Current chunk is full or doesn't exist — obtain a recycled or new chunk. + activeChunk = obtainChunk(); + } + + long offset = activeChunk.bumpAllocate(alignedSize); + return new ChunkedAllocationManager(accountingAllocator, activeChunk, offset, size); + } + + @Override + public ArrowBuf empty() { + return NettyAllocationManager.EMPTY_BUFFER; + } + + /** + * Obtains a chunk for use as the new active chunk. Prefers recycled chunks from the + * free-list; falls back to allocating a fresh one. + */ + private Chunk obtainChunk() { + Chunk recycled = freeChunks.pollFirst(); + if (recycled != null) { + recycled.resetBump(); + return recycled; + } + return new Chunk(chunkSize, this); + } + + /** + * Called when a chunk's sub-allocation count reaches zero (all ArrowBufs released). + * Recycles the chunk if possible, or frees its memory if the free-list is full. + * + *

Thread-safety note: between {@code subAllocCount} reaching 0 and this method acquiring + * the lock, another thread may call {@link #create} and bump-allocate from this chunk (if + * it is still the active chunk). The guard {@code chunk.subAllocCount.get() > 0} handles + * this race. + */ + synchronized void onChunkDrained(Chunk chunk) { + // Guard: if new sub-allocations were added between refCount=0 and now, skip. + if (chunk.subAllocCount.get() > 0) { + return; + } + + if (chunk == activeChunk) { + // Still the active chunk — just reset bump pointer for continued use. + chunk.resetBump(); + } else if (freeChunks.size() < maxFreeChunks) { + // Not active, pool has room — recycle. + freeChunks.offerFirst(chunk); + } else { + // Pool is full — free native memory. + chunk.destroy(); + } + } + + /** + * Closes this factory, freeing all cached chunks. Active chunks with outstanding + * sub-allocations will be freed when their last ArrowBuf is released. + */ + public synchronized void close() { + while (!freeChunks.isEmpty()) { + freeChunks.poll().destroy(); + } + if (activeChunk != null && activeChunk.subAllocCount.get() == 0) { + activeChunk.destroy(); + } + activeChunk = null; + } + + @VisibleForTesting + Deque getFreeChunks() { + return freeChunks; + } + + @VisibleForTesting + Chunk getActiveChunk() { + return activeChunk; + } + } +} diff --git a/fluss-common/src/test/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManagerTest.java b/fluss-common/src/test/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManagerTest.java new file mode 100644 index 0000000000..4662ac7aed --- /dev/null +++ b/fluss-common/src/test/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/ChunkedAllocationManagerTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.shaded.arrow.org.apache.arrow.memory; + +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager.ChunkedFactory; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ChunkedAllocationManager}. */ +class ChunkedAllocationManagerTest { + + /** Use a small chunk size (1KB) for testing to make chunk transitions easy to trigger. */ + private static final long TEST_CHUNK_SIZE = 1024; + + private static final int TEST_MAX_FREE_CHUNKS = 2; + + private ChunkedFactory factory; + private BufferAllocator allocator; + + @BeforeEach + void setUp() { + factory = new ChunkedFactory(TEST_CHUNK_SIZE, TEST_MAX_FREE_CHUNKS); + allocator = BufferAllocatorUtil.createBufferAllocator(factory); + } + + @AfterEach + void tearDown() { + allocator.close(); + factory.close(); + } + + @Test + void testSmallAllocationPackedIntoSameChunk() { + // Multiple small allocations should land in the same chunk (contiguous addresses). + ArrowBuf buf1 = allocator.buffer(64); + ArrowBuf buf2 = allocator.buffer(128); + ArrowBuf buf3 = allocator.buffer(32); + // The gap between buf1 and buf2 should be exactly the aligned size of buf1 (64 bytes, + // already 8-byte aligned). + assertThat(buf2.memoryAddress() - buf1.memoryAddress()).isEqualTo(64); + // buf2 is 128 bytes, 8-byte aligned → gap of 128. + assertThat(buf3.memoryAddress() - buf2.memoryAddress()).isEqualTo(128); + + buf1.close(); + buf2.close(); + buf3.close(); + } + + @Test + void testAlignmentBytes() { + // Allocate a non-8-byte-aligned size; the next allocation should still be 8-byte aligned. + ArrowBuf buf1 = allocator.buffer(7); // 7 bytes → aligned to 8 + ArrowBuf buf2 = allocator.buffer(1); + + assertThat(buf2.memoryAddress() - buf1.memoryAddress()).isEqualTo(8); + buf1.close(); + buf2.close(); + } + + @Test + void testAllocateNewChunkWhenFull() { + // Fill the first chunk (1KB) and verify a new chunk is allocated. + // Each allocation is 256 bytes → 4 allocations fill the chunk. + List bufs = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + bufs.add(allocator.buffer(256)); + } + + // The first 4 buffers should be contiguous within one chunk. + long chunkBaseAddress = bufs.get(0).memoryAddress(); + for (int i = 1; i < 4; i++) { + assertThat(bufs.get(i).memoryAddress()).isEqualTo(chunkBaseAddress + (long) i * 256); + } + + assertThat(factory.getActiveChunk().used).isEqualTo(TEST_CHUNK_SIZE); + // The 5th allocation should land in a new chunk. + ArrowBuf overflow = allocator.buffer(256); + // The active chunk should now be a fresh chunk with only the overflow allocation, + // proving a new chunk was created (rather than checking memory address which is + // non-deterministic — the OS may allocate the new chunk right after the first one). + assertThat(factory.getActiveChunk().used).isEqualTo(256); + + overflow.close(); + bufs.forEach(ArrowBuf::close); + } + + @Test + void testChunkRecycledAfterAllBuffersReleased() { + // Allocate a buffer, release it (drains the chunk), then allocate again — should reuse. + ArrowBuf buf1 = allocator.buffer(64); + long firstAddress = buf1.memoryAddress(); + buf1.close(); + + // The chunk was drained (subAllocCount=0) and should be recycled (reset bump pointer). + // Next allocation should reuse the same chunk starting from offset 0. + ArrowBuf buf2 = allocator.buffer(64); + assertThat(buf2.memoryAddress()).isEqualTo(firstAddress); + + buf2.close(); + } + + @Test + void testBatchAllocateAndReleaseCycle() { + // Simulate the batch-friendly lifecycle: allocate many, release all, allocate again. + int bufferCount = 10; + List batch1 = new ArrayList<>(); + for (int i = 0; i < bufferCount; i++) { + batch1.add(allocator.buffer(64)); + } + long batch1BaseAddress = batch1.get(0).memoryAddress(); + + // Release all — chunk should be recycled. + batch1.forEach(ArrowBuf::close); + + // Second batch should reuse the same chunk. + List batch2 = new ArrayList<>(); + for (int i = 0; i < bufferCount; i++) { + batch2.add(allocator.buffer(64)); + } + assertThat(batch2.get(0).memoryAddress()).isEqualTo(batch1BaseAddress); + + batch2.forEach(ArrowBuf::close); + } + + @Test + void testFreeListCapRespected() { + // Create multiple chunks, release them all — only maxFreeChunks should be cached. + // chunkSize=1024, allocate 512 per chunk to create distinct chunks. + List chunk1Bufs = new ArrayList<>(); + // Fill chunk 1 (1024 bytes) + chunk1Bufs.add(allocator.buffer(512)); + chunk1Bufs.add(allocator.buffer(512)); + + // Fill chunk 2 (triggers new chunk) + List chunk2Bufs = new ArrayList<>(); + chunk2Bufs.add(allocator.buffer(512)); + chunk2Bufs.add(allocator.buffer(512)); + + // Fill chunk 3 (triggers new chunk) + List chunk3Bufs = new ArrayList<>(); + chunk3Bufs.add(allocator.buffer(512)); + chunk3Bufs.add(allocator.buffer(512)); + + // Fill chunk 4 (triggers new chunk) + List chunk4Bufs = new ArrayList<>(); + chunk4Bufs.add(allocator.buffer(512)); + chunk4Bufs.add(allocator.buffer(512)); + + // Release all chunks — free-list can hold only 2 (maxFreeChunks=2). + // Active chunk gets reset, inactive ones go to free-list or are destroyed. + chunk1Bufs.forEach(ArrowBuf::close); + chunk2Bufs.forEach(ArrowBuf::close); + chunk3Bufs.forEach(ArrowBuf::close); + chunk4Bufs.forEach(ArrowBuf::close); + + assertThat(factory.getFreeChunks()).hasSize(TEST_MAX_FREE_CHUNKS); + assertThat(factory.getActiveChunk()).isNotNull(); + assertThat(factory.getActiveChunk().used).isEqualTo(0L); + } + + @Test + void testEmptyBufferReturnedFromFactory() { + ArrowBuf empty = factory.empty(); + assertThat(empty).isNotNull(); + assertThat(empty.capacity()).isEqualTo(0); + } + + @Test + void testMultipleSmallBatches() { + // Verify stable memory across multiple batch cycles(smaller than batch size). + for (int batch = 0; batch < 5; batch++) { + List bufs = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + bufs.add(allocator.buffer(16)); + } + bufs.forEach(ArrowBuf::close); + } + + assertThat(factory.getFreeChunks()).hasSize(0); + assertThat(factory.getActiveChunk()).isNotNull(); + assertThat(factory.getActiveChunk().used).isEqualTo(0L); + } + + @Test + void testFactoryCloseReleasesResources() { + ChunkedFactory localFactory = new ChunkedFactory(TEST_CHUNK_SIZE, TEST_MAX_FREE_CHUNKS); + BufferAllocator localAllocator = BufferAllocatorUtil.createBufferAllocator(localFactory); + + ArrowBuf buf = localAllocator.buffer(64); + buf.close(); + + // close the allocator. + localAllocator.close(); + localFactory.close(); + assertThat(localFactory.getFreeChunks()).isEmpty(); + assertThat(localFactory.getActiveChunk()).isNull(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index 6a758b2f4c..c4701170b1 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -35,7 +35,6 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.server.TabletManagerBase; import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; import org.apache.fluss.server.kv.autoinc.ZkSequenceGeneratorFactory; @@ -45,6 +44,7 @@ import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.MapUtils; @@ -150,7 +150,7 @@ private KvManager( throws IOException { super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir); this.logManager = logManager; - this.arrowBufferAllocator = BufferAllocatorUtil.createBufferAllocator(); + this.arrowBufferAllocator = BufferAllocatorUtil.createBufferAllocator(null); this.memorySegmentPool = LazyMemorySegmentPool.createServerBufferPool(conf); this.zkClient = zkClient; this.remoteKvDir = FlussPaths.remoteKvDir(conf);