From feec9ffd027263aaaa7b6ec26511bf560032cb4e Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Thu, 26 Feb 2026 18:06:05 +0800 Subject: [PATCH 1/3] [common] A custom rounding policy that reduces Arrow's chunk size from 16MB to 4MB which same as netty arena. --- .../fluss/record/FlussRoundingPolicy.java | 117 ++++++++++++++++++ .../fluss/record/LogRecordReadContext.java | 7 +- 2 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java diff --git a/fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java b/fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java new file mode 100644 index 0000000000..bea4bf9a71 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.record; + +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.rounding.RoundingPolicy; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.util.CommonUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A custom rounding policy that reduces Arrow's chunk size from 16MB to 4MB to align with Netty + * 4.1+ memory allocation behavior. + * + *

Arrow's default maxOrder=11 (16MB chunks) can cause memory inefficiency when used with Netty's + * maxOrder=9 (4MB chunks). This class patches the default by using maxOrder=9. + * + *

TODO: Remove this once fixed upstream in apache/arrow-java#1040. + */ +public class FlussRoundingPolicy implements RoundingPolicy { + private static final Logger LOG = LoggerFactory.getLogger(FlussRoundingPolicy.class); + public final long chunkSize; + private static final long MIN_PAGE_SIZE = 4096; + private static final long MAX_CHUNK_SIZE = ((long) Integer.MAX_VALUE + 1) / 2; + private static final long DEFAULT_CHUNK_SIZE; + + static { + long defaultPageSize = + Long.parseLong(System.getProperty("org.apache.memory.allocator.pageSize", "8192")); + try { + validateAndCalculatePageShifts(defaultPageSize); + } catch (Throwable t) { + defaultPageSize = 8192; + } + + int defaultMaxOrder = + Integer.parseInt(System.getProperty("org.apache.memory.allocator.maxOrder", "9")); + try { + validateAndCalculateChunkSize(defaultPageSize, defaultMaxOrder); + } catch (Throwable t) { + defaultMaxOrder = 11; + } + DEFAULT_CHUNK_SIZE = validateAndCalculateChunkSize(defaultPageSize, defaultMaxOrder); + if (LOG.isDebugEnabled()) { + LOG.debug("-Dorg.apache.memory.allocator.pageSize: {}", defaultPageSize); + LOG.debug("-Dorg.apache.memory.allocator.maxOrder: {}", defaultMaxOrder); + } + } + + /** + * Validates page size (must be >= 4096 and power of 2) and returns log2(pageSize). + * + * @throws IllegalArgumentException if validation fails + */ + private static long validateAndCalculatePageShifts(long pageSize) { + if (pageSize < MIN_PAGE_SIZE) { + throw new IllegalArgumentException( + "pageSize: " + pageSize + " (expected: " + MIN_PAGE_SIZE + ")"); + } + + if ((pageSize & pageSize - 1) != 0) { + throw new IllegalArgumentException("pageSize: " + pageSize + " (expected: power of 2)"); + } + + // Logarithm base 2. At this point we know that pageSize is a power of two. + return Long.SIZE - 1L - Long.numberOfLeadingZeros(pageSize); + } + + private static long validateAndCalculateChunkSize(long pageSize, int maxOrder) { + if (maxOrder > 14) { + throw new IllegalArgumentException("maxOrder: " + maxOrder + " (expected: 0-14)"); + } + + // Ensure the resulting chunkSize does not overflow. + long chunkSize = pageSize; + for (long i = maxOrder; i > 0; i--) { + if (chunkSize > MAX_CHUNK_SIZE / 2) { + throw new IllegalArgumentException( + String.format( + "pageSize (%d) << maxOrder (%d) must not exceed %d", + pageSize, maxOrder, MAX_CHUNK_SIZE)); + } + chunkSize <<= 1; + } + return chunkSize; + } + + /** The singleton instance with default chunk size. */ + public static final FlussRoundingPolicy DEFAULT_ROUNDING_POLICY = + new FlussRoundingPolicy(DEFAULT_CHUNK_SIZE); + + private FlussRoundingPolicy(long chunkSize) { + this.chunkSize = chunkSize; + } + + /** Rounds request size to next power of 2 if less than chunk size, otherwise returns as-is. */ + @Override + public long getRoundedSize(long requestSize) { + return requestSize < chunkSize ? CommonUtil.nextPowerOfTwo(requestSize) : requestSize; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 71536c8122..2b31d2b29b 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -25,6 +25,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.InternalRow.FieldGetter; import org.apache.fluss.row.ProjectedRow; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationListener; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; @@ -115,7 +116,11 @@ private static LogRecordReadContext createArrowReadContext( boolean projectionPushDowned, SchemaGetter schemaGetter) { // TODO: use a more reasonable memory limit - BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + BufferAllocator allocator = + new RootAllocator( + AllocationListener.NOOP, + Long.MAX_VALUE, + FlussRoundingPolicy.DEFAULT_ROUNDING_POLICY); FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields); return new LogRecordReadContext( LogFormat.ARROW, From 4a542f0c8d5174e07a9b77bbb2d3cad785e20fd0 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Mon, 13 Apr 2026 10:50:52 +0800 Subject: [PATCH 2/3] Modified based on CR --- .../org/apache/fluss/client/write/RecordAccumulator.java | 4 ++-- .../java/org/apache/fluss/record/FlussRoundingPolicy.java | 8 ++++++++ .../org/apache/fluss/record/LogRecordReadContext.java | 8 +------- .../main/java/org/apache/fluss/server/kv/KvManager.java | 4 ++-- 4 files changed, 13 insertions(+), 11 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 70988d0e25..579f4d0c42 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -32,11 +32,11 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metrics.MetricNames; +import org.apache.fluss.record.FlussRoundingPolicy; import org.apache.fluss.record.LogRecordBatchStatisticsCollector; import org.apache.fluss.row.arrow.ArrowWriter; import org.apache.fluss.row.arrow.ArrowWriterPool; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import org.apache.fluss.utils.CopyOnWriteMap; import org.apache.fluss.utils.MathUtils; import org.apache.fluss.utils.clock.Clock; @@ -134,7 +134,7 @@ public final class RecordAccumulator { Math.max(1, (int) conf.get(ConfigOptions.CLIENT_WRITER_BATCH_SIZE).getBytes()); this.writerBufferPool = LazyMemorySegmentPool.createWriterBufferPool(conf); - this.bufferAllocator = new RootAllocator(Long.MAX_VALUE); + this.bufferAllocator = FlussRoundingPolicy.createBufferAllocator(); this.arrowWriterPool = new ArrowWriterPool(bufferAllocator); this.incomplete = new IncompleteBatches(); this.nodesDrainIndex = new HashMap<>(); diff --git a/fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java b/fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java index bea4bf9a71..3a89586e0e 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java @@ -17,6 +17,9 @@ package org.apache.fluss.record; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationListener; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.rounding.RoundingPolicy; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.util.CommonUtil; @@ -109,6 +112,11 @@ private FlussRoundingPolicy(long chunkSize) { this.chunkSize = chunkSize; } + /** Creates a {@link BufferAllocator} configured with the {@link FlussRoundingPolicy}. */ + public static BufferAllocator createBufferAllocator() { + return new RootAllocator(AllocationListener.NOOP, Long.MAX_VALUE, DEFAULT_ROUNDING_POLICY); + } + /** Rounds request size to next power of 2 if less than chunk size, otherwise returns as-is. */ @Override public long getRoundedSize(long requestSize) { diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 2b31d2b29b..61e0bd0cac 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -25,9 +25,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.InternalRow.FieldGetter; import org.apache.fluss.row.ProjectedRow; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationListener; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; import org.apache.fluss.types.DataType; import org.apache.fluss.types.RowType; @@ -116,11 +114,7 @@ private static LogRecordReadContext createArrowReadContext( boolean projectionPushDowned, SchemaGetter schemaGetter) { // TODO: use a more reasonable memory limit - BufferAllocator allocator = - new RootAllocator( - AllocationListener.NOOP, - Long.MAX_VALUE, - FlussRoundingPolicy.DEFAULT_ROUNDING_POLICY); + BufferAllocator allocator = FlussRoundingPolicy.createBufferAllocator(); FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields); return new LogRecordReadContext( LogFormat.ARROW, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index 1f0c9c2689..b8a08d72a7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -35,6 +35,7 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.record.FlussRoundingPolicy; import org.apache.fluss.server.TabletManagerBase; import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; import org.apache.fluss.server.kv.autoinc.ZkSequenceGeneratorFactory; @@ -44,7 +45,6 @@ import org.apache.fluss.server.metrics.group.TabletServerMetricGroup; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import org.apache.fluss.utils.FileUtils; import org.apache.fluss.utils.FlussPaths; import org.apache.fluss.utils.MapUtils; @@ -150,7 +150,7 @@ private KvManager( throws IOException { super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir); this.logManager = logManager; - this.arrowBufferAllocator = new RootAllocator(Long.MAX_VALUE); + this.arrowBufferAllocator = FlussRoundingPolicy.createBufferAllocator(); this.memorySegmentPool = LazyMemorySegmentPool.createServerBufferPool(conf); this.zkClient = zkClient; this.remoteKvDir = FlussPaths.remoteKvDir(conf); From 3a05d51dbe35285fcd82931a56fc8143beeb07c2 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Mon, 13 Apr 2026 14:32:11 +0800 Subject: [PATCH 3/3] refactor to ArrowRoundingPolicy and BufferAllocatorUtil --- .../fluss/client/write/RecordAccumulator.java | 4 +- .../fluss/record/LogRecordReadContext.java | 3 +- .../arrow/memory/ArrowRoundingPolicy.java} | 46 ++++++++----------- .../row/arrow/memory/BufferAllocatorUtil.java | 35 ++++++++++++++ .../org/apache/fluss/server/kv/KvManager.java | 4 +- 5 files changed, 61 insertions(+), 31 deletions(-) rename fluss-common/src/main/java/org/apache/fluss/{record/FlussRoundingPolicy.java => row/arrow/memory/ArrowRoundingPolicy.java} (69%) create mode 100644 fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/BufferAllocatorUtil.java diff --git a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java index 579f4d0c42..5f35b1e1cb 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java @@ -32,10 +32,10 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metrics.MetricNames; -import org.apache.fluss.record.FlussRoundingPolicy; import org.apache.fluss.record.LogRecordBatchStatisticsCollector; import org.apache.fluss.row.arrow.ArrowWriter; import org.apache.fluss.row.arrow.ArrowWriterPool; +import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import org.apache.fluss.utils.CopyOnWriteMap; import org.apache.fluss.utils.MathUtils; @@ -134,7 +134,7 @@ public final class RecordAccumulator { Math.max(1, (int) conf.get(ConfigOptions.CLIENT_WRITER_BATCH_SIZE).getBytes()); this.writerBufferPool = LazyMemorySegmentPool.createWriterBufferPool(conf); - this.bufferAllocator = FlussRoundingPolicy.createBufferAllocator(); + this.bufferAllocator = BufferAllocatorUtil.createBufferAllocator(); this.arrowWriterPool = new ArrowWriterPool(bufferAllocator); this.incomplete = new IncompleteBatches(); this.nodesDrainIndex = new HashMap<>(); diff --git a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java index 61e0bd0cac..29175aabdc 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java +++ b/fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java @@ -25,6 +25,7 @@ import org.apache.fluss.row.InternalRow; import org.apache.fluss.row.InternalRow.FieldGetter; import org.apache.fluss.row.ProjectedRow; +import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot; import org.apache.fluss.types.DataType; @@ -114,7 +115,7 @@ private static LogRecordReadContext createArrowReadContext( boolean projectionPushDowned, SchemaGetter schemaGetter) { // TODO: use a more reasonable memory limit - BufferAllocator allocator = FlussRoundingPolicy.createBufferAllocator(); + BufferAllocator allocator = BufferAllocatorUtil.createBufferAllocator(); FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields); return new LogRecordReadContext( LogFormat.ARROW, diff --git a/fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/ArrowRoundingPolicy.java similarity index 69% rename from fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java rename to fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/ArrowRoundingPolicy.java index 3a89586e0e..bfd65fd220 100644 --- a/fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/ArrowRoundingPolicy.java @@ -1,25 +1,24 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. */ -package org.apache.fluss.record; +package org.apache.fluss.row.arrow.memory; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationListener; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; -import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.rounding.RoundingPolicy; import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.util.CommonUtil; @@ -36,8 +35,8 @@ *

TODO: Remove this once fixed upstream in apache/arrow-java#1040. */ -public class FlussRoundingPolicy implements RoundingPolicy { - private static final Logger LOG = LoggerFactory.getLogger(FlussRoundingPolicy.class); +public class ArrowRoundingPolicy implements RoundingPolicy { + private static final Logger LOG = LoggerFactory.getLogger(ArrowRoundingPolicy.class); public final long chunkSize; private static final long MIN_PAGE_SIZE = 4096; private static final long MAX_CHUNK_SIZE = ((long) Integer.MAX_VALUE + 1) / 2; @@ -105,18 +104,13 @@ private static long validateAndCalculateChunkSize(long pageSize, int maxOrder) { } /** The singleton instance with default chunk size. */ - public static final FlussRoundingPolicy DEFAULT_ROUNDING_POLICY = - new FlussRoundingPolicy(DEFAULT_CHUNK_SIZE); + public static final ArrowRoundingPolicy ARROW_ROUNDING_POLICY = + new ArrowRoundingPolicy(DEFAULT_CHUNK_SIZE); - private FlussRoundingPolicy(long chunkSize) { + private ArrowRoundingPolicy(long chunkSize) { this.chunkSize = chunkSize; } - /** Creates a {@link BufferAllocator} configured with the {@link FlussRoundingPolicy}. */ - public static BufferAllocator createBufferAllocator() { - return new RootAllocator(AllocationListener.NOOP, Long.MAX_VALUE, DEFAULT_ROUNDING_POLICY); - } - /** Rounds request size to next power of 2 if less than chunk size, otherwise returns as-is. */ @Override public long getRoundedSize(long requestSize) { diff --git a/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/BufferAllocatorUtil.java b/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/BufferAllocatorUtil.java new file mode 100644 index 0000000000..3b0e2914b3 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/row/arrow/memory/BufferAllocatorUtil.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fluss.row.arrow.memory; + +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationListener; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator; +import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator; + +import static org.apache.fluss.row.arrow.memory.ArrowRoundingPolicy.ARROW_ROUNDING_POLICY; + +/** Utility class for creating Arrow BufferAllocators with the custom ArrowRoundingPolicy. */ +public class BufferAllocatorUtil { + + /** Creates a {@link BufferAllocator} configured with the {@link ArrowRoundingPolicy}. */ + public static BufferAllocator createBufferAllocator() { + return new RootAllocator(AllocationListener.NOOP, Long.MAX_VALUE, ARROW_ROUNDING_POLICY); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java index b8a08d72a7..6a758b2f4c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java @@ -35,7 +35,7 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; -import org.apache.fluss.record.FlussRoundingPolicy; +import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil; import org.apache.fluss.server.TabletManagerBase; import org.apache.fluss.server.kv.autoinc.AutoIncrementManager; import org.apache.fluss.server.kv.autoinc.ZkSequenceGeneratorFactory; @@ -150,7 +150,7 @@ private KvManager( throws IOException { super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir); this.logManager = logManager; - this.arrowBufferAllocator = FlussRoundingPolicy.createBufferAllocator(); + this.arrowBufferAllocator = BufferAllocatorUtil.createBufferAllocator(); this.memorySegmentPool = LazyMemorySegmentPool.createServerBufferPool(conf); this.zkClient = zkClient; this.remoteKvDir = FlussPaths.remoteKvDir(conf);