From 4b6e6899175752560c3204dc8c8c3757245c6e05 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Tue, 24 Mar 2026 14:30:35 +0800 Subject: [PATCH 1/3] [IMPROVE] Add traffic size distribution per requestCode in RemotingCodeDistributionHandler --- remoting/pom.xml | 12 + .../remoting/netty/NettyRemotingServer.java | 15 +- .../remoting/netty/NettyServerConfig.java | 9 + .../RemotingCodeDistributionHandler.java | 104 ++++++-- .../RemotingCodeDistributionBenchmark.java | 147 +++++++++++ .../RemotingCodeDistributionHandlerTest.java | 233 ++++++++++++++++-- 6 files changed, 472 insertions(+), 48 deletions(-) create mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionBenchmark.java diff --git a/remoting/pom.xml b/remoting/pom.xml index 94d77b46fc5..fe451939ebc 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -50,5 +50,17 @@ 2.9.0 test + + org.openjdk.jmh + jmh-core + 1.36 + test + + + org.openjdk.jmh + jmh-generator-annprocess + 1.36 + test + diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 578c102daa4..c2d50ba7a00 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -124,7 +124,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti protected final NettyEncoder encoder = new NettyEncoder(); protected final NettyConnectManageHandler connectionManageHandler = new NettyConnectManageHandler(); protected final NettyServerHandler serverHandler = new NettyServerHandler(); - protected final RemotingCodeDistributionHandler distributionHandler = new RemotingCodeDistributionHandler(); + protected final RemotingCodeDistributionHandler distributionHandler; public NettyRemotingServer(final NettyServerConfig nettyServerConfig) { this(nettyServerConfig, null); @@ -136,6 +136,7 @@ public NettyRemotingServer(final NettyServerConfig nettyServerConfig, this.serverBootstrap = new ServerBootstrap(); this.nettyServerConfig = nettyServerConfig; this.channelEventListener = channelEventListener; + this.distributionHandler = new RemotingCodeDistributionHandler(nettyServerConfig); this.publicExecutor = buildPublicExecutor(nettyServerConfig); this.scheduledExecutorService = buildScheduleExecutor(); @@ -426,6 +427,18 @@ private void printRemotingCodeDistribution() { TRAFFIC_LOGGER.info("Port: {}, ResponseCode Distribution: {}", nettyServerConfig.getListenPort(), outBoundSnapshotString); } + + String inBoundTrafficSnapshotString = distributionHandler.getInBoundTrafficSnapshotString(); + if (inBoundTrafficSnapshotString != null) { + TRAFFIC_LOGGER.info("Port: {}, RequestCode Traffic(byte): {}", + nettyServerConfig.getListenPort(), inBoundTrafficSnapshotString); + } + + String outBoundTrafficSnapshotString = distributionHandler.getOutBoundTrafficSnapshotString(); + if (outBoundTrafficSnapshotString != null) { + TRAFFIC_LOGGER.info("Port: {}, ResponseCode Traffic(byte): {}", + nettyServerConfig.getListenPort(), outBoundTrafficSnapshotString); + } } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java index 664dee8371c..2d4b9bdf04d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java @@ -50,6 +50,7 @@ public class NettyServerConfig implements Cloneable { * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd */ private boolean useEpollNativeSelector = false; + private boolean enableDetailedTrafficSize = false; public String getBindAddress() { return bindAddress; @@ -199,4 +200,12 @@ public int getShutdownWaitTimeSeconds() { public void setShutdownWaitTimeSeconds(int shutdownWaitTimeSeconds) { this.shutdownWaitTimeSeconds = shutdownWaitTimeSeconds; } + + public boolean isEnableDetailedTrafficSize() { + return enableDetailedTrafficSize; + } + + public void setEnableDetailedTrafficSize(boolean enableDetailedTrafficSize) { + this.enableDetailedTrafficSize = enableDetailedTrafficSize; + } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java index c6a97fe441b..598f49036e1 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java @@ -30,29 +30,71 @@ @ChannelHandler.Sharable public class RemotingCodeDistributionHandler extends ChannelDuplexHandler { - private final ConcurrentMap inboundDistribution; - private final ConcurrentMap outboundDistribution; + private final ConcurrentMap inboundStats; + private final ConcurrentMap outboundStats; + private final NettyServerConfig nettyServerConfig; - public RemotingCodeDistributionHandler() { - inboundDistribution = new ConcurrentHashMap<>(); - outboundDistribution = new ConcurrentHashMap<>(); + public RemotingCodeDistributionHandler(NettyServerConfig nettyServerConfig) { + this.inboundStats = new ConcurrentHashMap<>(); + this.outboundStats = new ConcurrentHashMap<>(); + this.nettyServerConfig = nettyServerConfig; } - private void countInbound(int requestCode) { - LongAdder item = inboundDistribution.computeIfAbsent(requestCode, k -> new LongAdder()); - item.increment(); + void recordInbound(RemotingCommand cmd) { + TrafficStats stats = inboundStats.computeIfAbsent(cmd.getCode(), k -> new TrafficStats()); + stats.count.increment(); + stats.trafficSize.add(calcCommandSize(cmd)); } - private void countOutbound(int responseCode) { - LongAdder item = outboundDistribution.computeIfAbsent(responseCode, k -> new LongAdder()); - item.increment(); + void recordOutbound(RemotingCommand cmd) { + TrafficStats stats = outboundStats.computeIfAbsent(cmd.getCode(), k -> new TrafficStats()); + stats.count.increment(); + stats.trafficSize.add(calcCommandSize(cmd)); + } + + /** + * Protocol fixed overhead in bytes: + *
+     * frameHeader:  totalLen(4) + headerLenMark(4) = 8
+     * fixedHeader:  code(2) + language(1) + version(2) + opaque(4) + flag(4)
+     *            + remarkLenPrefix(4) + extFieldsLenPrefix(4) = 21
+     * 
+ */ + static final int FIXED_OVERHEAD = 4 + 4 + 2 + 1 + 2 + 4 + 4 + 4 + 4; + + private int calcCommandSize(RemotingCommand cmd) { + int size = FIXED_OVERHEAD; + byte[] body = cmd.getBody(); + if (body != null) { + size += body.length; + } + if (nettyServerConfig.isEnableDetailedTrafficSize()) { + size += calcHeaderVariableSize(cmd); + } + return size; + } + + private int calcHeaderVariableSize(RemotingCommand cmd) { + int size = 0; + String remark = cmd.getRemark(); + if (remark != null) { + size += remark.length(); + } + HashMap extFields = cmd.getExtFields(); + if (extFields != null) { + for (Map.Entry entry : extFields.entrySet()) { + if (entry.getKey() != null && entry.getValue() != null) { + size += 2 + entry.getKey().length() + 4 + entry.getValue().length(); + } + } + } + return size; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof RemotingCommand) { - RemotingCommand cmd = (RemotingCommand) msg; - countInbound(cmd.getCode()); + recordInbound((RemotingCommand) msg); } ctx.fireChannelRead(msg); } @@ -60,16 +102,23 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof RemotingCommand) { - RemotingCommand cmd = (RemotingCommand) msg; - countOutbound(cmd.getCode()); + recordOutbound((RemotingCommand) msg); } ctx.write(msg, promise); } - private Map getDistributionSnapshot(Map countMap) { - Map map = new HashMap<>(countMap.size()); - for (Map.Entry entry : countMap.entrySet()) { - map.put(entry.getKey(), entry.getValue().sumThenReset()); + private Map getCountSnapshot(ConcurrentMap statsMap) { + Map map = new HashMap<>(statsMap.size()); + for (Map.Entry entry : statsMap.entrySet()) { + map.put(entry.getKey(), entry.getValue().count.sumThenReset()); + } + return map; + } + + private Map getTrafficSnapshot(ConcurrentMap statsMap) { + Map map = new HashMap<>(statsMap.size()); + for (Map.Entry entry : statsMap.entrySet()) { + map.put(entry.getKey(), entry.getValue().trafficSize.sumThenReset()); } return map; } @@ -95,10 +144,23 @@ private String snapshotToString(Map distribution) { } public String getInBoundSnapshotString() { - return this.snapshotToString(this.getDistributionSnapshot(this.inboundDistribution)); + return this.snapshotToString(this.getCountSnapshot(this.inboundStats)); } public String getOutBoundSnapshotString() { - return this.snapshotToString(this.getDistributionSnapshot(this.outboundDistribution)); + return this.snapshotToString(this.getCountSnapshot(this.outboundStats)); + } + + public String getInBoundTrafficSnapshotString() { + return this.snapshotToString(this.getTrafficSnapshot(this.inboundStats)); + } + + public String getOutBoundTrafficSnapshotString() { + return this.snapshotToString(this.getTrafficSnapshot(this.outboundStats)); + } + + static class TrafficStats { + final LongAdder count = new LongAdder(); + final LongAdder trafficSize = new LongAdder(); } } diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionBenchmark.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionBenchmark.java new file mode 100644 index 00000000000..fda603860d8 --- /dev/null +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionBenchmark.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.netty; + +import java.util.HashMap; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.junit.Ignore; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.results.format.ResultFormatType; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * Benchmark for RemotingCodeDistributionHandler traffic recording overhead. + *

+ * Benchmarks call recordInbound() directly (package-private) to isolate + * recording overhead from Netty pipeline propagation cost. + *

+ * Use @Param to compare detailed-off (O(1)) vs detailed-on (O(n)) modes. + *

+ * Run via IDE: execute the main method. + */ +@Ignore +@BenchmarkMode({Mode.Throughput, Mode.AverageTime}) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Fork(value = 1, jvmArgs = {"-Xms512m", "-Xmx512m"}) +@Warmup(iterations = 3, time = 3) +@Measurement(iterations = 5, time = 3) +public class RemotingCodeDistributionBenchmark { + + /** + * Shared handler state - handler itself is thread-safe (ConcurrentHashMap + LongAdder). + */ + @State(Scope.Benchmark) + public static class HandlerState { + RemotingCodeDistributionHandler handler; + NettyServerConfig config; + + @Param({"false", "true"}) + boolean enableDetailedTrafficSize; + + @Setup + public void setup() { + config = new NettyServerConfig(); + config.setEnableDetailedTrafficSize(enableDetailedTrafficSize); + handler = new RemotingCodeDistributionHandler(config); + } + } + + /** + * Per-thread commands to avoid contention on RemotingCommand fields. + */ + @State(Scope.Thread) + public static class CommandState { + RemotingCommand cmdNoBody; + RemotingCommand cmdWithBody; + RemotingCommand cmdWithBodyAndExtFields; + + @Setup + public void setup() { + cmdNoBody = RemotingCommand.createRequestCommand(10, null); + + cmdWithBody = RemotingCommand.createRequestCommand(11, null); + cmdWithBody.setBody(new byte[4096]); + + cmdWithBodyAndExtFields = RemotingCommand.createRequestCommand(12, null); + cmdWithBodyAndExtFields.setBody(new byte[4096]); + cmdWithBodyAndExtFields.setRemark("benchmark remark"); + HashMap extFields = new HashMap<>(); + extFields.put("topic", "BenchmarkTopic"); + extFields.put("queueId", "0"); + extFields.put("bornTimestamp", "1700000000000"); + extFields.put("flag", "0"); + extFields.put("properties", "KEYS=key1\u0002TAGS=tagA"); + cmdWithBodyAndExtFields.setExtFields(extFields); + } + } + + @Benchmark + @Threads(1) + public void recordInbound_noBody_singleThread(HandlerState h, CommandState c) { + h.handler.recordInbound(c.cmdNoBody); + } + + @Benchmark + @Threads(1) + public void recordInbound_withBody_singleThread(HandlerState h, CommandState c) { + h.handler.recordInbound(c.cmdWithBody); + } + + /** + * Key comparison: detailed-off vs detailed-on with 5 extFields. + * Shows cost of O(n) extFields iteration when switch is enabled. + */ + @Benchmark + @Threads(1) + public void recordInbound_withBodyAndExtFields_singleThread(HandlerState h, CommandState c) { + h.handler.recordInbound(c.cmdWithBodyAndExtFields); + } + + @Benchmark + @Threads(4) + public void recordInbound_withBody_4threads(HandlerState h, CommandState c) { + h.handler.recordInbound(c.cmdWithBody); + } + + @Benchmark + @Threads(8) + public void recordInbound_withBody_8threads(HandlerState h, CommandState c) { + h.handler.recordInbound(c.cmdWithBody); + } + + public static void main(String[] args) throws Exception { + Options opt = new OptionsBuilder() + .include(RemotingCodeDistributionBenchmark.class.getSimpleName()) + .resultFormat(ResultFormatType.TEXT) + .build(); + new Runner(opt).run(); + } +} diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java index eb623a9de92..4a90bb46c41 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java @@ -16,45 +16,202 @@ */ package org.apache.rocketmq.remoting.netty; -import java.lang.reflect.Method; -import java.time.Duration; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.rocketmq.common.ThreadFactoryImpl; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; -import static org.awaitility.Awaitility.await; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class RemotingCodeDistributionHandlerTest { - private final RemotingCodeDistributionHandler distributionHandler = new RemotingCodeDistributionHandler(); + private NettyServerConfig nettyServerConfig; + private RemotingCodeDistributionHandler handler; + + @Before + public void setUp() { + nettyServerConfig = new NettyServerConfig(); + handler = new RemotingCodeDistributionHandler(nettyServerConfig); + } + + @Test + public void testCountDistribution() { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + ChannelPromise promise = mock(ChannelPromise.class); + + RemotingCommand inCmd = RemotingCommand.createRequestCommand(100, null); + handler.channelRead(ctx, inCmd); + handler.channelRead(ctx, inCmd); + + RemotingCommand outCmd = RemotingCommand.createResponseCommand(0, "ok"); + try { + handler.write(ctx, outCmd, promise); + handler.write(ctx, outCmd, promise); + handler.write(ctx, outCmd, promise); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + + Assert.assertEquals("{100:2}", handler.getInBoundSnapshotString()); + Assert.assertEquals("{0:3}", handler.getOutBoundSnapshotString()); + + verify(ctx, times(2)).fireChannelRead(inCmd); + } + + @Test + public void testTrafficSizeWithBody() { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + + byte[] body = new byte[1024]; + RemotingCommand cmd = RemotingCommand.createRequestCommand(200, null); + cmd.setBody(body); + + handler.channelRead(ctx, cmd); + + long expectedSize = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 1024; + Assert.assertEquals("{200:" + expectedSize + "}", handler.getInBoundTrafficSnapshotString()); + } + + @Test + public void testTrafficSizeWithoutBody() { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + + RemotingCommand cmd = RemotingCommand.createRequestCommand(201, null); + handler.channelRead(ctx, cmd); + + long expectedSize = RemotingCodeDistributionHandler.FIXED_OVERHEAD; + Assert.assertEquals("{201:" + expectedSize + "}", handler.getInBoundTrafficSnapshotString()); + } + + @Test + public void testDetailedSizeEnabled() { + nettyServerConfig.setEnableDetailedTrafficSize(true); + + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + + RemotingCommand cmd = RemotingCommand.createRequestCommand(300, null); + cmd.setBody(new byte[512]); + cmd.setRemark("test remark"); + HashMap extFields = new HashMap<>(); + extFields.put("topic", "TestTopic"); + extFields.put("queueId", "0"); + cmd.setExtFields(extFields); + + handler.channelRead(ctx, cmd); + + // FIXED_OVERHEAD + body(512) + // + remark("test remark".length()=11) + // + extField("topic"): keyLenPrefix(2) + "topic"(5) + valLenPrefix(4) + "TestTopic"(9) = 20 + // + extField("queueId"): keyLenPrefix(2) + "queueId"(7) + valLenPrefix(4) + "0"(1) = 14 + long expectedSize = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 512 + 11 + 20 + 14; + Assert.assertEquals("{300:" + expectedSize + "}", handler.getInBoundTrafficSnapshotString()); + } + + @Test + public void testDetailedSizeDisabledIgnoresHeaderFields() { + nettyServerConfig.setEnableDetailedTrafficSize(false); + + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + + RemotingCommand cmd = RemotingCommand.createRequestCommand(301, null); + cmd.setBody(new byte[256]); + cmd.setRemark("some remark"); + HashMap extFields = new HashMap<>(); + extFields.put("key", "value"); + cmd.setExtFields(extFields); + + handler.channelRead(ctx, cmd); + + // When disabled, only FIXED_OVERHEAD + body + long expectedSize = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 256; + Assert.assertEquals("{301:" + expectedSize + "}", handler.getInBoundTrafficSnapshotString()); + } + + @Test + public void testSnapshotResetsAfterRead() { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + + RemotingCommand cmd = RemotingCommand.createRequestCommand(400, null); + handler.channelRead(ctx, cmd); + + // First read should have data + Assert.assertNotNull(handler.getInBoundSnapshotString()); + Assert.assertNotNull(handler.getInBoundTrafficSnapshotString()); + + // Second read should be null (reset by sumThenReset) + Assert.assertNull(handler.getInBoundSnapshotString()); + Assert.assertNull(handler.getInBoundTrafficSnapshotString()); + } @Test - public void remotingCodeCountTest() throws Exception { - Class clazz = RemotingCodeDistributionHandler.class; - Method methodIn = clazz.getDeclaredMethod("countInbound", int.class); - Method methodOut = clazz.getDeclaredMethod("countOutbound", int.class); - methodIn.setAccessible(true); - methodOut.setAccessible(true); + public void testMultipleRequestCodes() { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + + RemotingCommand cmd1 = RemotingCommand.createRequestCommand(10, null); + cmd1.setBody(new byte[100]); + RemotingCommand cmd2 = RemotingCommand.createRequestCommand(20, null); + cmd2.setBody(new byte[200]); + + handler.channelRead(ctx, cmd1); + handler.channelRead(ctx, cmd1); + handler.channelRead(ctx, cmd2); + + String countSnapshot = handler.getInBoundSnapshotString(); + Assert.assertNotNull(countSnapshot); + Assert.assertTrue(countSnapshot.contains("10:2")); + Assert.assertTrue(countSnapshot.contains("20:1")); + + // Traffic was already reset by getInBoundSnapshotString? No, they are separate maps. + // Actually count and traffic are in the same TrafficStats object but + // getCountSnapshot and getTrafficSnapshot reset independently + String trafficSnapshot = handler.getInBoundTrafficSnapshotString(); + Assert.assertNotNull(trafficSnapshot); + long size1 = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 100; + long size2 = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 200; + Assert.assertTrue(trafficSnapshot.contains("10:" + (size1 * 2))); + Assert.assertTrue(trafficSnapshot.contains("20:" + size2)); + } + + @Test + public void testNonRemotingCommandIgnored() { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + + handler.channelRead(ctx, "not a RemotingCommand"); + + Assert.assertNull(handler.getInBoundSnapshotString()); + verify(ctx).fireChannelRead("not a RemotingCommand"); + } + + @Test + public void testConcurrentAccess() throws Exception { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); int threadCount = 4; - int count = 1000 * 1000; + int countPerThread = 100_000; CountDownLatch latch = new CountDownLatch(threadCount); - AtomicBoolean result = new AtomicBoolean(true); - ExecutorService executorService = Executors.newFixedThreadPool(threadCount, new ThreadFactoryImpl("RemotingCodeTest_")); + AtomicBoolean success = new AtomicBoolean(true); + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + byte[] body = new byte[64]; for (int i = 0; i < threadCount; i++) { - executorService.submit(() -> { + executor.submit(() -> { try { - for (int j = 0; j < count; j++) { - methodIn.invoke(distributionHandler, 1); - methodOut.invoke(distributionHandler, 2); + for (int j = 0; j < countPerThread; j++) { + RemotingCommand cmd = RemotingCommand.createRequestCommand(1, null); + cmd.setBody(body); + handler.channelRead(ctx, cmd); } } catch (Exception e) { - result.set(false); + success.set(false); } finally { latch.countDown(); } @@ -62,11 +219,35 @@ public void remotingCodeCountTest() throws Exception { } latch.await(); - Assert.assertTrue(result.get()); - await().pollInterval(Duration.ofMillis(100)).atMost(Duration.ofSeconds(10)).until(() -> { - boolean f1 = ("{1:" + count * threadCount + "}").equals(distributionHandler.getInBoundSnapshotString()); - boolean f2 = ("{2:" + count * threadCount + "}").equals(distributionHandler.getOutBoundSnapshotString()); - return f1 && f2; - }); + Assert.assertTrue(success.get()); + + long totalCount = threadCount * (long) countPerThread; + Assert.assertEquals("{1:" + totalCount + "}", handler.getInBoundSnapshotString()); + + long expectedTotalTraffic = totalCount * (RemotingCodeDistributionHandler.FIXED_OVERHEAD + 64); + Assert.assertEquals("{1:" + expectedTotalTraffic + "}", handler.getInBoundTrafficSnapshotString()); + + executor.shutdown(); + } + + @Test + public void testRuntimeSwitchToggle() { + ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + + RemotingCommand cmd = RemotingCommand.createRequestCommand(500, null); + cmd.setBody(new byte[128]); + cmd.setRemark("hello"); + + // Record with detailed disabled + nettyServerConfig.setEnableDetailedTrafficSize(false); + handler.channelRead(ctx, cmd); + long sizeOff = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 128; + Assert.assertEquals("{500:" + sizeOff + "}", handler.getInBoundTrafficSnapshotString()); + + // Toggle on at runtime + nettyServerConfig.setEnableDetailedTrafficSize(true); + handler.channelRead(ctx, cmd); + long sizeOn = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 128 + 5; // + "hello".length() + Assert.assertEquals("{500:" + sizeOn + "}", handler.getInBoundTrafficSnapshotString()); } -} \ No newline at end of file +} From d96862b3caea957c37665a8bbdc3092f8fc97dcc Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Tue, 24 Mar 2026 14:58:07 +0800 Subject: [PATCH 2/3] Remove JMH --- remoting/pom.xml | 12 -- .../RemotingCodeDistributionBenchmark.java | 147 ------------------ 2 files changed, 159 deletions(-) delete mode 100644 remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionBenchmark.java diff --git a/remoting/pom.xml b/remoting/pom.xml index fe451939ebc..94d77b46fc5 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -50,17 +50,5 @@ 2.9.0 test - - org.openjdk.jmh - jmh-core - 1.36 - test - - - org.openjdk.jmh - jmh-generator-annprocess - 1.36 - test - diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionBenchmark.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionBenchmark.java deleted file mode 100644 index fda603860d8..00000000000 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionBenchmark.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.remoting.netty; - -import java.util.HashMap; -import java.util.concurrent.TimeUnit; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.junit.Ignore; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; -import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; -import org.openjdk.jmh.annotations.Threads; -import org.openjdk.jmh.annotations.Warmup; -import org.openjdk.jmh.results.format.ResultFormatType; -import org.openjdk.jmh.runner.Runner; -import org.openjdk.jmh.runner.options.Options; -import org.openjdk.jmh.runner.options.OptionsBuilder; - -/** - * Benchmark for RemotingCodeDistributionHandler traffic recording overhead. - *

- * Benchmarks call recordInbound() directly (package-private) to isolate - * recording overhead from Netty pipeline propagation cost. - *

- * Use @Param to compare detailed-off (O(1)) vs detailed-on (O(n)) modes. - *

- * Run via IDE: execute the main method. - */ -@Ignore -@BenchmarkMode({Mode.Throughput, Mode.AverageTime}) -@OutputTimeUnit(TimeUnit.NANOSECONDS) -@Fork(value = 1, jvmArgs = {"-Xms512m", "-Xmx512m"}) -@Warmup(iterations = 3, time = 3) -@Measurement(iterations = 5, time = 3) -public class RemotingCodeDistributionBenchmark { - - /** - * Shared handler state - handler itself is thread-safe (ConcurrentHashMap + LongAdder). - */ - @State(Scope.Benchmark) - public static class HandlerState { - RemotingCodeDistributionHandler handler; - NettyServerConfig config; - - @Param({"false", "true"}) - boolean enableDetailedTrafficSize; - - @Setup - public void setup() { - config = new NettyServerConfig(); - config.setEnableDetailedTrafficSize(enableDetailedTrafficSize); - handler = new RemotingCodeDistributionHandler(config); - } - } - - /** - * Per-thread commands to avoid contention on RemotingCommand fields. - */ - @State(Scope.Thread) - public static class CommandState { - RemotingCommand cmdNoBody; - RemotingCommand cmdWithBody; - RemotingCommand cmdWithBodyAndExtFields; - - @Setup - public void setup() { - cmdNoBody = RemotingCommand.createRequestCommand(10, null); - - cmdWithBody = RemotingCommand.createRequestCommand(11, null); - cmdWithBody.setBody(new byte[4096]); - - cmdWithBodyAndExtFields = RemotingCommand.createRequestCommand(12, null); - cmdWithBodyAndExtFields.setBody(new byte[4096]); - cmdWithBodyAndExtFields.setRemark("benchmark remark"); - HashMap extFields = new HashMap<>(); - extFields.put("topic", "BenchmarkTopic"); - extFields.put("queueId", "0"); - extFields.put("bornTimestamp", "1700000000000"); - extFields.put("flag", "0"); - extFields.put("properties", "KEYS=key1\u0002TAGS=tagA"); - cmdWithBodyAndExtFields.setExtFields(extFields); - } - } - - @Benchmark - @Threads(1) - public void recordInbound_noBody_singleThread(HandlerState h, CommandState c) { - h.handler.recordInbound(c.cmdNoBody); - } - - @Benchmark - @Threads(1) - public void recordInbound_withBody_singleThread(HandlerState h, CommandState c) { - h.handler.recordInbound(c.cmdWithBody); - } - - /** - * Key comparison: detailed-off vs detailed-on with 5 extFields. - * Shows cost of O(n) extFields iteration when switch is enabled. - */ - @Benchmark - @Threads(1) - public void recordInbound_withBodyAndExtFields_singleThread(HandlerState h, CommandState c) { - h.handler.recordInbound(c.cmdWithBodyAndExtFields); - } - - @Benchmark - @Threads(4) - public void recordInbound_withBody_4threads(HandlerState h, CommandState c) { - h.handler.recordInbound(c.cmdWithBody); - } - - @Benchmark - @Threads(8) - public void recordInbound_withBody_8threads(HandlerState h, CommandState c) { - h.handler.recordInbound(c.cmdWithBody); - } - - public static void main(String[] args) throws Exception { - Options opt = new OptionsBuilder() - .include(RemotingCodeDistributionBenchmark.class.getSimpleName()) - .resultFormat(ResultFormatType.TEXT) - .build(); - new Runner(opt).run(); - } -} From dd2bb4b3f9804fe78186be14fcf266b4357e3220 Mon Sep 17 00:00:00 2001 From: RongtongJin Date: Wed, 25 Mar 2026 14:37:28 +0800 Subject: [PATCH 3/3] [IMPROVE] Refactor traffic stats to use precise ByteBuf wire size in encoder/decoder --- .../remoting/RemotingProtocolHandler.java | 3 +- .../rocketmq/remoting/netty/NettyDecoder.java | 13 ++ .../rocketmq/remoting/netty/NettyEncoder.java | 15 ++ .../remoting/netty/NettyRemotingServer.java | 8 +- .../remoting/netty/NettyServerConfig.java | 9 - .../RemotingCodeDistributionHandler.java | 145 ++++--------- .../RemotingCodeDistributionHandlerTest.java | 194 +++--------------- 7 files changed, 101 insertions(+), 286 deletions(-) diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java index 49fea89cdd3..1da4432618e 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/remoting/protocol/remoting/RemotingProtocolHandler.java @@ -52,8 +52,7 @@ public boolean match(ByteBuf in) { public void config(ChannelHandlerContext ctx, ByteBuf msg) { ctx.pipeline().addLast( this.encoderSupplier.get(), - new NettyDecoder(), - this.remotingCodeDistributionHandlerSupplier.get(), + new NettyDecoder(this.remotingCodeDistributionHandlerSupplier.get()), this.connectionManageHandlerSupplier.get(), this.serverHandlerSupplier.get() ); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java index 19624d74028..50760a9a056 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyDecoder.java @@ -32,8 +32,15 @@ public class NettyDecoder extends LengthFieldBasedFrameDecoder { private static final int FRAME_MAX_LENGTH = Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); + private final RemotingCodeDistributionHandler distributionHandler; + public NettyDecoder() { + this(null); + } + + public NettyDecoder(RemotingCodeDistributionHandler distributionHandler) { super(FRAME_MAX_LENGTH, 0, 4, 0, 4); + this.distributionHandler = distributionHandler; } @Override @@ -45,7 +52,13 @@ public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (null == frame) { return null; } + // readableBytes() is the frame size after stripping the 4-byte length prefix; + // add 4 back to get the actual wire size. + int wireSize = frame.readableBytes() + 4; RemotingCommand cmd = RemotingCommand.decode(frame); + if (distributionHandler != null) { + distributionHandler.recordInbound(cmd.getCode(), wireSize); + } cmd.setProcessTimer(timer); return cmd; } catch (Exception e) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java index 2af0af6b725..6cfa63d471d 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java @@ -30,15 +30,30 @@ public class NettyEncoder extends MessageToByteEncoder { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME); + private final RemotingCodeDistributionHandler distributionHandler; + + public NettyEncoder() { + this(null); + } + + public NettyEncoder(RemotingCodeDistributionHandler distributionHandler) { + this.distributionHandler = distributionHandler; + } + @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { + int beginIndex = out.writerIndex(); remotingCommand.fastEncodeHeader(out); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } + if (distributionHandler != null) { + distributionHandler.recordOutbound( + remotingCommand.getCode(), out.writerIndex() - beginIndex); + } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index c2d50ba7a00..06f9314a638 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -121,7 +121,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti // sharable handlers protected final TlsModeHandler tlsModeHandler = new TlsModeHandler(TlsSystemConfig.tlsMode); - protected final NettyEncoder encoder = new NettyEncoder(); + protected final NettyEncoder encoder; protected final NettyConnectManageHandler connectionManageHandler = new NettyConnectManageHandler(); protected final NettyServerHandler serverHandler = new NettyServerHandler(); protected final RemotingCodeDistributionHandler distributionHandler; @@ -136,7 +136,8 @@ public NettyRemotingServer(final NettyServerConfig nettyServerConfig, this.serverBootstrap = new ServerBootstrap(); this.nettyServerConfig = nettyServerConfig; this.channelEventListener = channelEventListener; - this.distributionHandler = new RemotingCodeDistributionHandler(nettyServerConfig); + this.distributionHandler = new RemotingCodeDistributionHandler(); + this.encoder = new NettyEncoder(distributionHandler); this.publicExecutor = buildPublicExecutor(nettyServerConfig); this.scheduledExecutorService = buildScheduleExecutor(); @@ -277,8 +278,7 @@ protected ChannelPipeline configChannel(SocketChannel ch) { HANDSHAKE_HANDLER_NAME, new HandshakeHandler()) .addLast(getDefaultEventExecutorGroup(), encoder, - new NettyDecoder(), - distributionHandler, + new NettyDecoder(distributionHandler), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), connectionManageHandler, diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java index 2d4b9bdf04d..664dee8371c 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyServerConfig.java @@ -50,7 +50,6 @@ public class NettyServerConfig implements Cloneable { * --host=x86_64-linux-gnu \ --build=x86_64-pc-linux-gnu \ --without-gd */ private boolean useEpollNativeSelector = false; - private boolean enableDetailedTrafficSize = false; public String getBindAddress() { return bindAddress; @@ -200,12 +199,4 @@ public int getShutdownWaitTimeSeconds() { public void setShutdownWaitTimeSeconds(int shutdownWaitTimeSeconds) { this.shutdownWaitTimeSeconds = shutdownWaitTimeSeconds; } - - public boolean isEnableDetailedTrafficSize() { - return enableDetailedTrafficSize; - } - - public void setEnableDetailedTrafficSize(boolean enableDetailedTrafficSize) { - this.enableDetailedTrafficSize = enableDetailedTrafficSize; - } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java index 598f49036e1..e23c2f3fc92 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandler.java @@ -16,147 +16,72 @@ */ package org.apache.rocketmq.remoting.netty; -import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.LongAdder; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -@ChannelHandler.Sharable -public class RemotingCodeDistributionHandler extends ChannelDuplexHandler { - - private final ConcurrentMap inboundStats; - private final ConcurrentMap outboundStats; - private final NettyServerConfig nettyServerConfig; +/** + * Thread-safe tracker for per-requestCode count and traffic distribution. + *

+ */ +public class RemotingCodeDistributionHandler { - public RemotingCodeDistributionHandler(NettyServerConfig nettyServerConfig) { - this.inboundStats = new ConcurrentHashMap<>(); - this.outboundStats = new ConcurrentHashMap<>(); - this.nettyServerConfig = nettyServerConfig; - } + private final ConcurrentMap inboundStats = new ConcurrentHashMap<>(); + private final ConcurrentMap outboundStats = new ConcurrentHashMap<>(); - void recordInbound(RemotingCommand cmd) { - TrafficStats stats = inboundStats.computeIfAbsent(cmd.getCode(), k -> new TrafficStats()); + public void recordInbound(int code, int wireSize) { + TrafficStats stats = inboundStats.computeIfAbsent(code, k -> new TrafficStats()); stats.count.increment(); - stats.trafficSize.add(calcCommandSize(cmd)); + stats.trafficSize.add(wireSize); } - void recordOutbound(RemotingCommand cmd) { - TrafficStats stats = outboundStats.computeIfAbsent(cmd.getCode(), k -> new TrafficStats()); + public void recordOutbound(int code, int wireSize) { + TrafficStats stats = outboundStats.computeIfAbsent(code, k -> new TrafficStats()); stats.count.increment(); - stats.trafficSize.add(calcCommandSize(cmd)); + stats.trafficSize.add(wireSize); } - /** - * Protocol fixed overhead in bytes: - *

-     * frameHeader:  totalLen(4) + headerLenMark(4) = 8
-     * fixedHeader:  code(2) + language(1) + version(2) + opaque(4) + flag(4)
-     *            + remarkLenPrefix(4) + extFieldsLenPrefix(4) = 21
-     * 
- */ - static final int FIXED_OVERHEAD = 4 + 4 + 2 + 1 + 2 + 4 + 4 + 4 + 4; - - private int calcCommandSize(RemotingCommand cmd) { - int size = FIXED_OVERHEAD; - byte[] body = cmd.getBody(); - if (body != null) { - size += body.length; - } - if (nettyServerConfig.isEnableDetailedTrafficSize()) { - size += calcHeaderVariableSize(cmd); - } - return size; - } - - private int calcHeaderVariableSize(RemotingCommand cmd) { - int size = 0; - String remark = cmd.getRemark(); - if (remark != null) { - size += remark.length(); - } - HashMap extFields = cmd.getExtFields(); - if (extFields != null) { - for (Map.Entry entry : extFields.entrySet()) { - if (entry.getKey() != null && entry.getValue() != null) { - size += 2 + entry.getKey().length() + 4 + entry.getValue().length(); - } - } - } - return size; + public String getInBoundSnapshotString() { + return snapshotToString(getSnapshot(inboundStats, true)); } - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) { - if (msg instanceof RemotingCommand) { - recordInbound((RemotingCommand) msg); - } - ctx.fireChannelRead(msg); + public String getOutBoundSnapshotString() { + return snapshotToString(getSnapshot(outboundStats, true)); } - @Override - public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { - if (msg instanceof RemotingCommand) { - recordOutbound((RemotingCommand) msg); - } - ctx.write(msg, promise); + public String getInBoundTrafficSnapshotString() { + return snapshotToString(getSnapshot(inboundStats, false)); } - private Map getCountSnapshot(ConcurrentMap statsMap) { - Map map = new HashMap<>(statsMap.size()); - for (Map.Entry entry : statsMap.entrySet()) { - map.put(entry.getKey(), entry.getValue().count.sumThenReset()); - } - return map; + public String getOutBoundTrafficSnapshotString() { + return snapshotToString(getSnapshot(outboundStats, false)); } - private Map getTrafficSnapshot(ConcurrentMap statsMap) { + private Map getSnapshot(ConcurrentMap statsMap, boolean count) { Map map = new HashMap<>(statsMap.size()); for (Map.Entry entry : statsMap.entrySet()) { - map.put(entry.getKey(), entry.getValue().trafficSize.sumThenReset()); + LongAdder adder = count ? entry.getValue().count : entry.getValue().trafficSize; + map.put(entry.getKey(), adder.sumThenReset()); } return map; } private String snapshotToString(Map distribution) { - if (null != distribution && !distribution.isEmpty()) { - StringBuilder sb = new StringBuilder("{"); - boolean first = true; - for (Map.Entry entry : distribution.entrySet()) { - if (0L == entry.getValue()) { - continue; - } - sb.append(first ? "" : ", ").append(entry.getKey()).append(":").append(entry.getValue()); - first = false; - } - if (first) { - return null; + if (null == distribution || distribution.isEmpty()) { + return null; + } + StringBuilder sb = new StringBuilder("{"); + boolean first = true; + for (Map.Entry entry : distribution.entrySet()) { + if (0L == entry.getValue()) { + continue; } - sb.append("}"); - return sb.toString(); + sb.append(first ? "" : ", ").append(entry.getKey()).append(":").append(entry.getValue()); + first = false; } - return null; - } - - public String getInBoundSnapshotString() { - return this.snapshotToString(this.getCountSnapshot(this.inboundStats)); - } - - public String getOutBoundSnapshotString() { - return this.snapshotToString(this.getCountSnapshot(this.outboundStats)); - } - - public String getInBoundTrafficSnapshotString() { - return this.snapshotToString(this.getTrafficSnapshot(this.inboundStats)); - } - - public String getOutBoundTrafficSnapshotString() { - return this.snapshotToString(this.getTrafficSnapshot(this.outboundStats)); + return first ? null : sb.append("}").toString(); } static class TrafficStats { diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java index 4a90bb46c41..36eab6fa09a 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/RemotingCodeDistributionHandlerTest.java @@ -16,199 +16,93 @@ */ package org.apache.rocketmq.remoting.netty; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPromise; -import java.util.HashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - public class RemotingCodeDistributionHandlerTest { - private NettyServerConfig nettyServerConfig; private RemotingCodeDistributionHandler handler; @Before public void setUp() { - nettyServerConfig = new NettyServerConfig(); - handler = new RemotingCodeDistributionHandler(nettyServerConfig); + handler = new RemotingCodeDistributionHandler(); } @Test - public void testCountDistribution() { - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - ChannelPromise promise = mock(ChannelPromise.class); - - RemotingCommand inCmd = RemotingCommand.createRequestCommand(100, null); - handler.channelRead(ctx, inCmd); - handler.channelRead(ctx, inCmd); - - RemotingCommand outCmd = RemotingCommand.createResponseCommand(0, "ok"); - try { - handler.write(ctx, outCmd, promise); - handler.write(ctx, outCmd, promise); - handler.write(ctx, outCmd, promise); - } catch (Exception e) { - Assert.fail(e.getMessage()); - } + public void testInboundCountAndTraffic() { + handler.recordInbound(100, 512); + handler.recordInbound(100, 1024); Assert.assertEquals("{100:2}", handler.getInBoundSnapshotString()); - Assert.assertEquals("{0:3}", handler.getOutBoundSnapshotString()); - - verify(ctx, times(2)).fireChannelRead(inCmd); - } - - @Test - public void testTrafficSizeWithBody() { - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - - byte[] body = new byte[1024]; - RemotingCommand cmd = RemotingCommand.createRequestCommand(200, null); - cmd.setBody(body); - - handler.channelRead(ctx, cmd); - - long expectedSize = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 1024; - Assert.assertEquals("{200:" + expectedSize + "}", handler.getInBoundTrafficSnapshotString()); + Assert.assertEquals("{100:1536}", handler.getInBoundTrafficSnapshotString()); } @Test - public void testTrafficSizeWithoutBody() { - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - - RemotingCommand cmd = RemotingCommand.createRequestCommand(201, null); - handler.channelRead(ctx, cmd); - - long expectedSize = RemotingCodeDistributionHandler.FIXED_OVERHEAD; - Assert.assertEquals("{201:" + expectedSize + "}", handler.getInBoundTrafficSnapshotString()); - } + public void testOutboundCountAndTraffic() { + handler.recordOutbound(0, 256); + handler.recordOutbound(0, 256); + handler.recordOutbound(0, 512); - @Test - public void testDetailedSizeEnabled() { - nettyServerConfig.setEnableDetailedTrafficSize(true); - - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - - RemotingCommand cmd = RemotingCommand.createRequestCommand(300, null); - cmd.setBody(new byte[512]); - cmd.setRemark("test remark"); - HashMap extFields = new HashMap<>(); - extFields.put("topic", "TestTopic"); - extFields.put("queueId", "0"); - cmd.setExtFields(extFields); - - handler.channelRead(ctx, cmd); - - // FIXED_OVERHEAD + body(512) - // + remark("test remark".length()=11) - // + extField("topic"): keyLenPrefix(2) + "topic"(5) + valLenPrefix(4) + "TestTopic"(9) = 20 - // + extField("queueId"): keyLenPrefix(2) + "queueId"(7) + valLenPrefix(4) + "0"(1) = 14 - long expectedSize = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 512 + 11 + 20 + 14; - Assert.assertEquals("{300:" + expectedSize + "}", handler.getInBoundTrafficSnapshotString()); + Assert.assertEquals("{0:3}", handler.getOutBoundSnapshotString()); + Assert.assertEquals("{0:1024}", handler.getOutBoundTrafficSnapshotString()); } @Test - public void testDetailedSizeDisabledIgnoresHeaderFields() { - nettyServerConfig.setEnableDetailedTrafficSize(false); - - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - - RemotingCommand cmd = RemotingCommand.createRequestCommand(301, null); - cmd.setBody(new byte[256]); - cmd.setRemark("some remark"); - HashMap extFields = new HashMap<>(); - extFields.put("key", "value"); - cmd.setExtFields(extFields); + public void testMultipleRequestCodes() { + handler.recordInbound(10, 200); + handler.recordInbound(10, 200); + handler.recordInbound(20, 300); - handler.channelRead(ctx, cmd); + String countSnapshot = handler.getInBoundSnapshotString(); + Assert.assertNotNull(countSnapshot); + Assert.assertTrue(countSnapshot.contains("10:2")); + Assert.assertTrue(countSnapshot.contains("20:1")); - // When disabled, only FIXED_OVERHEAD + body - long expectedSize = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 256; - Assert.assertEquals("{301:" + expectedSize + "}", handler.getInBoundTrafficSnapshotString()); + String trafficSnapshot = handler.getInBoundTrafficSnapshotString(); + Assert.assertNotNull(trafficSnapshot); + Assert.assertTrue(trafficSnapshot.contains("10:400")); + Assert.assertTrue(trafficSnapshot.contains("20:300")); } @Test public void testSnapshotResetsAfterRead() { - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); + handler.recordInbound(400, 100); - RemotingCommand cmd = RemotingCommand.createRequestCommand(400, null); - handler.channelRead(ctx, cmd); - - // First read should have data Assert.assertNotNull(handler.getInBoundSnapshotString()); Assert.assertNotNull(handler.getInBoundTrafficSnapshotString()); - // Second read should be null (reset by sumThenReset) + // Second read returns null after sumThenReset Assert.assertNull(handler.getInBoundSnapshotString()); Assert.assertNull(handler.getInBoundTrafficSnapshotString()); } @Test - public void testMultipleRequestCodes() { - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - - RemotingCommand cmd1 = RemotingCommand.createRequestCommand(10, null); - cmd1.setBody(new byte[100]); - RemotingCommand cmd2 = RemotingCommand.createRequestCommand(20, null); - cmd2.setBody(new byte[200]); - - handler.channelRead(ctx, cmd1); - handler.channelRead(ctx, cmd1); - handler.channelRead(ctx, cmd2); - - String countSnapshot = handler.getInBoundSnapshotString(); - Assert.assertNotNull(countSnapshot); - Assert.assertTrue(countSnapshot.contains("10:2")); - Assert.assertTrue(countSnapshot.contains("20:1")); - - // Traffic was already reset by getInBoundSnapshotString? No, they are separate maps. - // Actually count and traffic are in the same TrafficStats object but - // getCountSnapshot and getTrafficSnapshot reset independently - String trafficSnapshot = handler.getInBoundTrafficSnapshotString(); - Assert.assertNotNull(trafficSnapshot); - long size1 = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 100; - long size2 = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 200; - Assert.assertTrue(trafficSnapshot.contains("10:" + (size1 * 2))); - Assert.assertTrue(trafficSnapshot.contains("20:" + size2)); - } - - @Test - public void testNonRemotingCommandIgnored() { - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - - handler.channelRead(ctx, "not a RemotingCommand"); - + public void testEmptySnapshotReturnsNull() { Assert.assertNull(handler.getInBoundSnapshotString()); - verify(ctx).fireChannelRead("not a RemotingCommand"); + Assert.assertNull(handler.getOutBoundSnapshotString()); + Assert.assertNull(handler.getInBoundTrafficSnapshotString()); + Assert.assertNull(handler.getOutBoundTrafficSnapshotString()); } @Test public void testConcurrentAccess() throws Exception { - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - int threadCount = 4; int countPerThread = 100_000; + int wireSize = 512; CountDownLatch latch = new CountDownLatch(threadCount); AtomicBoolean success = new AtomicBoolean(true); ExecutorService executor = Executors.newFixedThreadPool(threadCount); - byte[] body = new byte[64]; for (int i = 0; i < threadCount; i++) { executor.submit(() -> { try { for (int j = 0; j < countPerThread; j++) { - RemotingCommand cmd = RemotingCommand.createRequestCommand(1, null); - cmd.setBody(body); - handler.channelRead(ctx, cmd); + handler.recordInbound(1, wireSize); } } catch (Exception e) { success.set(false); @@ -222,32 +116,10 @@ public void testConcurrentAccess() throws Exception { Assert.assertTrue(success.get()); long totalCount = threadCount * (long) countPerThread; + long totalTraffic = totalCount * wireSize; Assert.assertEquals("{1:" + totalCount + "}", handler.getInBoundSnapshotString()); - - long expectedTotalTraffic = totalCount * (RemotingCodeDistributionHandler.FIXED_OVERHEAD + 64); - Assert.assertEquals("{1:" + expectedTotalTraffic + "}", handler.getInBoundTrafficSnapshotString()); + Assert.assertEquals("{1:" + totalTraffic + "}", handler.getInBoundTrafficSnapshotString()); executor.shutdown(); } - - @Test - public void testRuntimeSwitchToggle() { - ChannelHandlerContext ctx = mock(ChannelHandlerContext.class); - - RemotingCommand cmd = RemotingCommand.createRequestCommand(500, null); - cmd.setBody(new byte[128]); - cmd.setRemark("hello"); - - // Record with detailed disabled - nettyServerConfig.setEnableDetailedTrafficSize(false); - handler.channelRead(ctx, cmd); - long sizeOff = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 128; - Assert.assertEquals("{500:" + sizeOff + "}", handler.getInBoundTrafficSnapshotString()); - - // Toggle on at runtime - nettyServerConfig.setEnableDetailedTrafficSize(true); - handler.channelRead(ctx, cmd); - long sizeOn = RemotingCodeDistributionHandler.FIXED_OVERHEAD + 128 + 5; // + "hello".length() - Assert.assertEquals("{500:" + sizeOn + "}", handler.getInBoundTrafficSnapshotString()); - } }