diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 70988d0e25..5f35b1e1cb 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -35,8 +35,8 @@ import org.apache.fluss.record.LogRecordBatchStatisticsCollector; import org.apache.fluss.row.arrow.ArrowWriter; import org.apache.fluss.row.arrow.ArrowWriterPool; +import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import org.apache.fluss.utils.CopyOnWriteMap; import org.apache.fluss.utils.MathUtils; import org.apache.fluss.utils.clock.Clock; @@ -134,7 +134,7 @@ public final class RecordAccumulator { Math.max(1, (int) conf.get(ConfigOptions.CLIENT_WRITER_BATCH_SIZE).getBytes()); this.writerBufferPool = LazyMemorySegmentPool.createWriterBufferPool(conf); - this.bufferAllocator = new RootAllocator(Long.MAX_VALUE); + this.bufferAllocator = BufferAllocatorUtil.createBufferAllocator(); this.arrowWriterPool = new ArrowWriterPool(bufferAllocator); this.incomplete = new IncompleteBatches(); this.nodesDrainIndex = new HashMap<>(); diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 71536c8122..29175aabdc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -25,8 +25,8 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.InternalRow.FieldGetter; import org.apache.fluss.row.ProjectedRow; +import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; @@ -115,7 +115,7 @@ private static LogRecordReadContext createArrowReadContext( boolean projectionPushDowned, SchemaGetter schemaGetter) { // TODO: use a more reasonable memory limit - BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + BufferAllocator allocator = BufferAllocatorUtil.createBufferAllocator(); FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields); return new LogRecordReadContext( LogFormat.ARROW, diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/ArrowRoundingPolicy.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/ArrowRoundingPolicy.java new file mode 100644 index 0000000000..bfd65fd220 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/ArrowRoundingPolicy.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.row.arrow.memory; + +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.rounding.RoundingPolicy; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.util.CommonUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A custom rounding policy that reduces Arrow's chunk size from 16MB to 4MB to align with Netty + * 4.1+ memory allocation behavior. + * + *

Arrow's default maxOrder=11 (16MB chunks) can cause memory inefficiency when used with Netty's + * maxOrder=9 (4MB chunks). This class patches the default by using maxOrder=9. + * + *

TODO: Remove this once fixed upstream in apache/arrow-java#1040. + */ +public class ArrowRoundingPolicy implements RoundingPolicy { + private static final Logger LOG = LoggerFactory.getLogger(ArrowRoundingPolicy.class); + public final long chunkSize; + private static final long MIN_PAGE_SIZE = 4096; + private static final long MAX_CHUNK_SIZE = ((long) Integer.MAX_VALUE + 1) / 2; + private static final long DEFAULT_CHUNK_SIZE; + + static { + long defaultPageSize = + Long.parseLong(System.getProperty("org.apache.memory.allocator.pageSize", "8192")); + try { + validateAndCalculatePageShifts(defaultPageSize); + } catch (Throwable t) { + defaultPageSize = 8192; + } + + int defaultMaxOrder = + Integer.parseInt(System.getProperty("org.apache.memory.allocator.maxOrder", "9")); + try { + validateAndCalculateChunkSize(defaultPageSize, defaultMaxOrder); + } catch (Throwable t) { + defaultMaxOrder = 11; + } + DEFAULT_CHUNK_SIZE = validateAndCalculateChunkSize(defaultPageSize, defaultMaxOrder); + if (LOG.isDebugEnabled()) { + LOG.debug("-Dorg.apache.memory.allocator.pageSize: {}", defaultPageSize); + LOG.debug("-Dorg.apache.memory.allocator.maxOrder: {}", defaultMaxOrder); + } + } + + /** + * Validates page size (must be >= 4096 and power of 2) and returns log2(pageSize). + * + * @throws IllegalArgumentException if validation fails + */ + private static long validateAndCalculatePageShifts(long pageSize) { + if (pageSize < MIN_PAGE_SIZE) { + throw new IllegalArgumentException( + "pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + ")"); + } + + if ((pageSize & pageSize - 1) != 0) { + throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2)"); + } + + // Logarithm base 2. At this point we know that pageSize is a power of two. + return Long.SIZE - 1L - Long.numberOfLeadingZeros(pageSize); + } + + private static long validateAndCalculateChunkSize(long pageSize, int maxOrder) { + if (maxOrder > 14) { + throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)"); + } + + // Ensure the resulting chunkSize does not overflow. + long chunkSize = pageSize; + for (long i = maxOrder; i > 0; i--) { + if (chunkSize > MAX_CHUNK_SIZE / 2) { + throw new IllegalArgumentException( + String.format( + "pageSize (%d) << maxOrder (%d) must not exceed %d", + pageSize, maxOrder, MAX_CHUNK_SIZE)); + } + chunkSize <<= 1; + } + return chunkSize; + } + + /** The singleton instance with default chunk size. */ + public static final ArrowRoundingPolicy ARROW_ROUNDING_POLICY = + new ArrowRoundingPolicy(DEFAULT_CHUNK_SIZE); + + private ArrowRoundingPolicy(long chunkSize) { + this.chunkSize = chunkSize; + } + + /** Rounds request size to next power of 2 if less than chunk size, otherwise returns as-is. */ + @Override + public long getRoundedSize(long requestSize) { + return requestSize < chunkSize ? CommonUtil.nextPowerOfTwo(requestSize) : requestSize; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/BufferAllocatorUtil.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/BufferAllocatorUtil.java new file mode 100644 index 0000000000..3b0e2914b3 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/BufferAllocatorUtil.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.row.arrow.memory; + +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationListener; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; + +import static org.apache.fluss.row.arrow.memory.ArrowRoundingPolicy.ARROW_ROUNDING_POLICY; + +/** Utility class for creating Arrow BufferAllocators with the custom ArrowRoundingPolicy. */ +public class BufferAllocatorUtil { + + /** Creates a {@link BufferAllocator} configured with the {@link ArrowRoundingPolicy}. */ + public static BufferAllocator createBufferAllocator() { + return new RootAllocator(AllocationListener.NOOP, Long.MAX_VALUE, ARROW_ROUNDING_POLICY); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index 1f0c9c2689..6a758b2f4c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -35,6 +35,7 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.server.TabletManagerBase; import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; import org.apache.fluss.server.kv.autoinc.ZkSequenceGeneratorFactory; @@ -44,7 +45,6 @@ import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.MapUtils; @@ -150,7 +150,7 @@ private KvManager( throws IOException { super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir); this.logManager = logManager; - this.arrowBufferAllocator = new RootAllocator(Long.MAX_VALUE); + this.arrowBufferAllocator = BufferAllocatorUtil.createBufferAllocator(); this.memorySegmentPool = LazyMemorySegmentPool.createServerBufferPool(conf); this.zkClient = zkClient; this.remoteKvDir = FlussPaths.remoteKvDir(conf);