Skip to content

Commit 6b7587f

Browse files
committed
[#2646] feat(netty): Support converting CompositeByteBuf to a direct nio buffer
1 parent 9c0c27d commit 6b7587f

2 files changed

Lines changed: 43 additions & 0 deletions

File tree

common/src/main/java/org/apache/uniffle/common/netty/buffer/NettyManagedBuffer.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
import java.nio.ByteBuffer;
2121

2222
import io.netty.buffer.ByteBuf;
23+
import io.netty.buffer.CompositeByteBuf;
2324
import io.netty.buffer.Unpooled;
2425

26+
import org.apache.uniffle.common.util.NettyUtils;
27+
2528
public class NettyManagedBuffer extends ManagedBuffer {
2629

2730
public static final NettyManagedBuffer EMPTY_BUFFER =
@@ -45,6 +48,24 @@ public ByteBuf byteBuf() {
4548

4649
@Override
4750
public ByteBuffer nioByteBuffer() {
51+
// CompositeByteBuf.nioBuffer will return a heap buffer if the composite buffer has more than
52+
// one component, even
53+
// if all components are direct buffers. In native client scenarios (like gluten), we prefer to
54+
// use direct buffer
55+
// to reduce data copying.
56+
if (buf instanceof CompositeByteBuf
57+
&& buf.isDirect()
58+
&& buf.nioBufferCount() > 1
59+
&& NettyUtils.preferDirectForCompositeBuffer()) {
60+
int length = buf.readableBytes();
61+
ByteBuffer merged = ByteBuffer.allocateDirect(length).order(buf.order());
62+
for (ByteBuffer buf : buf.nioBuffers()) {
63+
merged.put(buf);
64+
}
65+
merged.flip();
66+
return merged;
67+
}
68+
4869
return buf.nioBuffer();
4970
}
5071

common/src/main/java/org/apache/uniffle/common/util/NettyUtils.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.uniffle.common.util;
1919

20+
import java.util.Optional;
2021
import java.util.concurrent.ThreadFactory;
2122
import java.util.concurrent.atomic.AtomicReferenceArray;
2223

@@ -179,4 +180,25 @@ public static UnpooledByteBufAllocator createUnpooledByteBufAllocator(boolean pr
179180
public static long getMaxDirectMemory() {
180181
return MAX_DIRECT_MEMORY_IN_BYTES;
181182
}
183+
184+
private static final String PREFER_DIRECT_FOR_COMPOSITE_BUFFER_PROPERTY_KEY =
185+
"rss.netty.preferDirectForCompositeBuffer";
186+
private static final String PREFER_DIRECT_FOR_COMPOSITE_BUFFER_ENV_KEY =
187+
"RSS_NETTY_PREFER_DIRECT_FOR_COMPOSITE_BUFFER";
188+
private static final boolean PREFER_DIRECT_FOR_COMPOSITE_BUFFER_DEFAULT = false;
189+
private static final boolean _preferDirectForCompositeBuffer;
190+
191+
static {
192+
_preferDirectForCompositeBuffer =
193+
Optional.ofNullable(System.getProperty(PREFER_DIRECT_FOR_COMPOSITE_BUFFER_PROPERTY_KEY))
194+
.map(Boolean::new)
195+
.orElse(
196+
Optional.ofNullable(System.getenv(PREFER_DIRECT_FOR_COMPOSITE_BUFFER_ENV_KEY))
197+
.map(Boolean::new)
198+
.orElse(PREFER_DIRECT_FOR_COMPOSITE_BUFFER_DEFAULT));
199+
}
200+
201+
public static boolean preferDirectForCompositeBuffer() {
202+
return _preferDirectForCompositeBuffer;
203+
}
182204
}

0 commit comments

Comments
 (0)