diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index 42e71e7e997..2e6ceda124f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map.Entry; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -42,18 +43,22 @@ public class ConsumerManager { private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private final ConcurrentMap consumerTable = new ConcurrentHashMap<>(1024); + private final ConcurrentMap> topicGroupTable = + new ConcurrentHashMap<>(1024); private final ConcurrentMap consumerCompensationTable = new ConcurrentHashMap<>(1024); private final List consumerIdsChangeListenerList = new CopyOnWriteArrayList<>(); protected final BrokerStatsManager brokerStatsManager; private final long channelExpiredTimeout; private final long subscriptionExpiredTimeout; + private final BrokerConfig brokerConfig; public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener, long expiredTimeout) { this.consumerIdsChangeListenerList.add(consumerIdsChangeListener); this.brokerStatsManager = null; this.channelExpiredTimeout = expiredTimeout; this.subscriptionExpiredTimeout = expiredTimeout; + this.brokerConfig = null; } public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener, @@ -62,6 +67,7 @@ public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener this.brokerStatsManager = brokerStatsManager; this.channelExpiredTimeout = brokerConfig.getChannelExpiredTimeout(); this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout(); + this.brokerConfig = brokerConfig; } public ClientChannelInfo findChannel(final String group, final String clientId) { @@ -130,12 +136,44 @@ public int findSubscriptionDataCount(final String group) { public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) { boolean removed = false; + if (this.brokerConfig != null && this.brokerConfig.isEnableFastChannelEventProcess()) { + List groups = ClientChannelAttributeHelper.getConsumerGroups(channel); + if (this.brokerConfig.isPrintChannelGroups() && groups.size() >= 5 && groups.size() >= this.brokerConfig.getPrintChannelGroupsMinNum()) { + LOGGER.warn("channel close event, too many consumer groups one channel, {}, {}, {}", groups.size(), remoteAddr, groups); + } + for (String group : groups) { + if (null == group || group.length() == 0) { + continue; + } + ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); + if (null == consumerGroupInfo) { + continue; + } + ClientChannelInfo clientChannelInfo = consumerGroupInfo.doChannelCloseEvent(remoteAddr, channel); + if (clientChannelInfo != null) { + removed = true; + callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo, consumerGroupInfo.getSubscribeTopics()); + if (consumerGroupInfo.getChannelInfoTable().isEmpty()) { + ConsumerGroupInfo remove = this.consumerTable.remove(group); + if (remove != null) { + LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}", + group); + callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group); + clearTopicGroupTable(remove); + } + } + callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); + } + } + return removed; + } Iterator> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); ConsumerGroupInfo info = next.getValue(); ClientChannelInfo clientChannelInfo = info.doChannelCloseEvent(remoteAddr, channel); if (clientChannelInfo != null) { + removed = true; callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, next.getKey(), clientChannelInfo, info.getSubscribeTopics()); if (info.getChannelInfoTable().isEmpty()) { ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey()); @@ -143,15 +181,29 @@ public boolean doChannelCloseEvent(final String remoteAddr, final Channel channe LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}", next.getKey()); callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, next.getKey()); + clearTopicGroupTable(remove); } } - - callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel()); + if (!isBroadcastMode(info.getMessageModel())) { + callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel()); + } } } return removed; } + private void clearTopicGroupTable(final ConsumerGroupInfo groupInfo) { + for (String subscribeTopic : groupInfo.getSubscribeTopics()) { + Set groups = this.topicGroupTable.get(subscribeTopic); + if (groups != null) { + groups.remove(groupInfo.getGroupName()); + } + if (groups != null && groups.isEmpty()) { + this.topicGroupTable.remove(subscribeTopic); + } + } + } + // compensate consumer info for consumer without heartbeat public void compensateBasicConsumerInfo(String group, ConsumeType consumeType, MessageModel messageModel) { ConsumerGroupInfo consumerGroupInfo = consumerCompensationTable.computeIfAbsent(group, ConsumerGroupInfo::new); @@ -178,26 +230,38 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie long start = System.currentTimeMillis(); ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (null == consumerGroupInfo) { - callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_REGISTER, group, clientChannelInfo, - subList.stream().map(SubscriptionData::getTopic).collect(Collectors.toSet())); ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } + for (SubscriptionData subscriptionData : subList) { + Set groups = this.topicGroupTable.computeIfAbsent(subscriptionData.getTopic(), k -> ConcurrentHashMap.newKeySet()); + groups.add(group); + } + boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); + if (r1) { + callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_REGISTER, group, clientChannelInfo, + subList.stream().map(SubscriptionData::getTopic).collect(Collectors.toSet())); + } boolean r2 = false; if (updateSubscription) { r2 = consumerGroupInfo.updateSubscription(subList); } if (r1 || r2) { - if (isNotifyConsumerIdsChangedEnable) { + if (isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) { callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } + + if (this.brokerConfig != null && this.brokerConfig.isEnableFastChannelEventProcess() && r1) { + ClientChannelAttributeHelper.addConsumerGroup(clientChannelInfo.getChannel(), group); + } + if (null != this.brokerStatsManager) { this.brokerStatsManager.incConsumerRegisterTime((int) (System.currentTimeMillis() - start)); } @@ -216,8 +280,14 @@ public boolean registerConsumerWithoutSub(final String group, final ClientChanne ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } + + for (SubscriptionData subscriptionData : consumerGroupInfo.getSubscriptionTable().values()) { + Set groups = this.topicGroupTable.computeIfAbsent(subscriptionData.getTopic(), k -> ConcurrentHashMap.newKeySet()); + groups.add(group); + } + boolean updateChannelRst = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); - if (updateChannelRst && isNotifyConsumerIdsChangedEnable) { + if (updateChannelRst && isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) { callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } if (null != this.brokerStatsManager) { @@ -237,12 +307,13 @@ public void unregisterConsumer(final String group, final ClientChannelInfo clien if (consumerGroupInfo.getChannelInfoTable().isEmpty()) { ConsumerGroupInfo remove = this.consumerTable.remove(group); if (remove != null) { - LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}", group); + LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}, {}", group, remove); callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group); + clearTopicGroupTable(remove); } } - if (isNotifyConsumerIdsChangedEnable) { + if (isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) { callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } @@ -306,17 +377,7 @@ public void scanNotActiveChannel() { } public HashSet queryTopicConsumeByWho(final String topic) { - HashSet groups = new HashSet<>(); - Iterator> it = this.consumerTable.entrySet().iterator(); - while (it.hasNext()) { - Entry entry = it.next(); - ConcurrentMap subscriptionTable = - entry.getValue().getSubscriptionTable(); - if (subscriptionTable.containsKey(topic)) { - groups.add(entry.getKey()); - } - } - return groups; + return new HashSet<>(Optional.ofNullable(topicGroupTable.get(topic)).orElseGet(HashSet::new)); } public void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener) { @@ -332,4 +393,8 @@ protected void callConsumerIdsChangeListener(ConsumerGroupEvent event, String gr } } } + + private boolean isBroadcastMode(final MessageModel messageModel) { + return MessageModel.BROADCASTING.equals(messageModel); + } }