Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import java.nio.ByteBuffer;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;

import org.apache.uniffle.common.util.NettyUtils;

public class NettyManagedBuffer extends ManagedBuffer {

public static final NettyManagedBuffer EMPTY_BUFFER =
Expand All @@ -45,6 +48,22 @@ public ByteBuf byteBuf() {

@Override
public ByteBuffer nioByteBuffer() {
// CompositeByteBuf.nioBuffer will return a heap buffer if the composite buffer has more than
// one component, even if all components are direct buffers. In native client scenarios
// (like gluten), we prefer to use direct buffer to reduce data copying.
if (buf instanceof CompositeByteBuf
&& buf.isDirect()
&& buf.nioBufferCount() > 1
&& NettyUtils.preferDirectForCompositeBuffer()) {
int length = buf.readableBytes();
ByteBuffer merged = ByteBuffer.allocateDirect(length).order(buf.order());
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this still involves one data copy, it is an improvement for gluten scenarios.

  • Before, there were two data copies: Netty direct buffer → NIO heap buffer → Gluten native side.
  • After, there is only one data copy: Netty direct buffer → NIO direct buffer → Gluten native side (no copy).

for (ByteBuffer buf : buf.nioBuffers()) {
merged.put(buf);
}
merged.flip();
return merged;
}

return buf.nioBuffer();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.uniffle.common.util;

import java.util.Optional;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReferenceArray;

Expand Down Expand Up @@ -179,4 +180,25 @@ public static UnpooledByteBufAllocator createUnpooledByteBufAllocator(boolean pr
public static long getMaxDirectMemory() {
return MAX_DIRECT_MEMORY_IN_BYTES;
}

private static final String PREFER_DIRECT_FOR_COMPOSITE_BUFFER_PROPERTY_KEY =
"rss.netty.preferDirectForCompositeBuffer";
private static final String PREFER_DIRECT_FOR_COMPOSITE_BUFFER_ENV_KEY =
"RSS_NETTY_PREFER_DIRECT_FOR_COMPOSITE_BUFFER";
private static final boolean PREFER_DIRECT_FOR_COMPOSITE_BUFFER_DEFAULT = false;
private static final boolean _preferDirectForCompositeBuffer;

static {
_preferDirectForCompositeBuffer =
Optional.ofNullable(System.getProperty(PREFER_DIRECT_FOR_COMPOSITE_BUFFER_PROPERTY_KEY))
.map(Boolean::new)
.orElse(
Optional.ofNullable(System.getenv(PREFER_DIRECT_FOR_COMPOSITE_BUFFER_ENV_KEY))
.map(Boolean::new)
.orElse(PREFER_DIRECT_FOR_COMPOSITE_BUFFER_DEFAULT));
}

public static boolean preferDirectForCompositeBuffer() {
return _preferDirectForCompositeBuffer;
}
}
Loading