From 0a5f83f183b5b12a368c4d8203af2576dd67d8ef Mon Sep 17 00:00:00 2001 From: daguimu Date: Wed, 25 Mar 2026 19:20:15 +0800 Subject: [PATCH] [ISSUE #10207] Fix potential NPE in HA connections when remote address is null getRemoteSocketAddress() can return null if the socket is not connected. Add null check before calling toString() in DefaultHAConnection and AutoSwitchHAConnection constructors to prevent NullPointerException. Fixes #10207 --- .../store/ha/DefaultHAConnection.java | 4 +- .../ha/autoswitch/AutoSwitchHAConnection.java | 4 +- .../store/ha/DefaultHAConnectionTest.java | 60 +++++++++++++++++++ 3 files changed, 66 insertions(+), 2 deletions(-) create mode 100644 store/src/test/java/org/apache/rocketmq/store/ha/DefaultHAConnectionTest.java diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java index 5dd24410ed6..942b8400e48 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/DefaultHAConnection.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.store.ha; import java.io.IOException; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -61,7 +62,8 @@ public class DefaultHAConnection implements HAConnection { public DefaultHAConnection(final DefaultHAService haService, final SocketChannel socketChannel) throws IOException { this.haService = haService; this.socketChannel = socketChannel; - this.clientAddress = this.socketChannel.socket().getRemoteSocketAddress().toString(); + SocketAddress remoteAddress = this.socketChannel.socket().getRemoteSocketAddress(); + this.clientAddress = remoteAddress != null ? remoteAddress.toString() : ""; this.socketChannel.configureBlocking(false); this.socketChannel.socket().setSoLinger(false, -1); this.socketChannel.socket().setTcpNoDelay(true); diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java index cc55937aebb..a4388e07fb0 100644 --- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java +++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java @@ -18,6 +18,7 @@ package org.apache.rocketmq.store.ha.autoswitch; import java.io.IOException; +import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -106,7 +107,8 @@ public AutoSwitchHAConnection(AutoSwitchHAService haService, SocketChannel socke this.haService = haService; this.socketChannel = socketChannel; this.epochCache = epochCache; - this.clientAddress = this.socketChannel.socket().getRemoteSocketAddress().toString(); + SocketAddress remoteAddress = this.socketChannel.socket().getRemoteSocketAddress(); + this.clientAddress = remoteAddress != null ? remoteAddress.toString() : ""; this.socketChannel.configureBlocking(false); this.socketChannel.socket().setSoLinger(false, -1); this.socketChannel.socket().setTcpNoDelay(true); diff --git a/store/src/test/java/org/apache/rocketmq/store/ha/DefaultHAConnectionTest.java b/store/src/test/java/org/apache/rocketmq/store/ha/DefaultHAConnectionTest.java new file mode 100644 index 00000000000..941477abd91 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/ha/DefaultHAConnectionTest.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.ha; + +import java.nio.channels.SocketChannel; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.rocketmq.store.DefaultMessageStore; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.junit.After; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class DefaultHAConnectionTest { + + private SocketChannel socketChannel; + private DefaultHAConnection connection; + + @After + public void tearDown() throws Exception { + if (socketChannel != null && socketChannel.isOpen()) { + socketChannel.close(); + } + } + + @Test + public void testConstructorWithNullRemoteAddress() throws Exception { + DefaultHAService haService = mock(DefaultHAService.class); + DefaultMessageStore messageStore = mock(DefaultMessageStore.class); + MessageStoreConfig storeConfig = new MessageStoreConfig(); + when(haService.getDefaultMessageStore()).thenReturn(messageStore); + when(messageStore.getMessageStoreConfig()).thenReturn(storeConfig); + when(haService.getConnectionCount()).thenReturn(new AtomicInteger(0)); + + // SocketChannel.open() creates a non-connected channel, + // so getRemoteSocketAddress() returns null + socketChannel = SocketChannel.open(); + connection = new DefaultHAConnection(haService, socketChannel); + + assertNotNull(connection); + assertEquals("", connection.getClientAddress()); + } +}