Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -42,18 +43,22 @@ public class ConsumerManager {
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
private final ConcurrentMap<String, ConsumerGroupInfo> consumerTable =
new ConcurrentHashMap<>(1024);
private final ConcurrentMap<String, Set<String>> topicGroupTable =
new ConcurrentHashMap<>(1024);
private final ConcurrentMap<String, ConsumerGroupInfo> consumerCompensationTable =
new ConcurrentHashMap<>(1024);
private final List<ConsumerIdsChangeListener> consumerIdsChangeListenerList = new CopyOnWriteArrayList<>();
protected final BrokerStatsManager brokerStatsManager;
private final long channelExpiredTimeout;
private final long subscriptionExpiredTimeout;
private final BrokerConfig brokerConfig;

public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener, long expiredTimeout) {
this.consumerIdsChangeListenerList.add(consumerIdsChangeListener);
this.brokerStatsManager = null;
this.channelExpiredTimeout = expiredTimeout;
this.subscriptionExpiredTimeout = expiredTimeout;
this.brokerConfig = null;
}

public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener,
Expand All @@ -62,6 +67,7 @@ public ConsumerManager(final ConsumerIdsChangeListener consumerIdsChangeListener
this.brokerStatsManager = brokerStatsManager;
this.channelExpiredTimeout = brokerConfig.getChannelExpiredTimeout();
this.subscriptionExpiredTimeout = brokerConfig.getSubscriptionExpiredTimeout();
this.brokerConfig = brokerConfig;
}

public ClientChannelInfo findChannel(final String group, final String clientId) {
Expand Down Expand Up @@ -130,28 +136,74 @@ public int findSubscriptionDataCount(final String group) {

public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
boolean removed = false;
if (this.brokerConfig != null && this.brokerConfig.isEnableFastChannelEventProcess()) {
List<String> groups = ClientChannelAttributeHelper.getConsumerGroups(channel);
if (this.brokerConfig.isPrintChannelGroups() && groups.size() >= 5 && groups.size() >= this.brokerConfig.getPrintChannelGroupsMinNum()) {
LOGGER.warn("channel close event, too many consumer groups one channel, {}, {}, {}", groups.size(), remoteAddr, groups);
}
for (String group : groups) {
if (null == group || group.length() == 0) {
continue;
}
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
continue;
}
ClientChannelInfo clientChannelInfo = consumerGroupInfo.doChannelCloseEvent(remoteAddr, channel);
if (clientChannelInfo != null) {
removed = true;
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo, consumerGroupInfo.getSubscribeTopics());
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}",
group);
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
clearTopicGroupTable(remove);
}
}
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
return removed;
}
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> next = it.next();
ConsumerGroupInfo info = next.getValue();
ClientChannelInfo clientChannelInfo = info.doChannelCloseEvent(remoteAddr, channel);
if (clientChannelInfo != null) {
removed = true;
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_UNREGISTER, next.getKey(), clientChannelInfo, info.getSubscribeTopics());
if (info.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(next.getKey());
if (remove != null) {
LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}",
next.getKey());
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, next.getKey());
clearTopicGroupTable(remove);
}
}

callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
if (!isBroadcastMode(info.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, next.getKey(), info.getAllChannel());
}
}
}
return removed;
}

private void clearTopicGroupTable(final ConsumerGroupInfo groupInfo) {
for (String subscribeTopic : groupInfo.getSubscribeTopics()) {
Set<String> groups = this.topicGroupTable.get(subscribeTopic);
if (groups != null) {
groups.remove(groupInfo.getGroupName());
}
if (groups != null && groups.isEmpty()) {
this.topicGroupTable.remove(subscribeTopic);
}
}
}

// compensate consumer info for consumer without heartbeat
public void compensateBasicConsumerInfo(String group, ConsumeType consumeType, MessageModel messageModel) {
ConsumerGroupInfo consumerGroupInfo = consumerCompensationTable.computeIfAbsent(group, ConsumerGroupInfo::new);
Expand All @@ -178,26 +230,38 @@ public boolean registerConsumer(final String group, final ClientChannelInfo clie
long start = System.currentTimeMillis();
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_REGISTER, group, clientChannelInfo,
subList.stream().map(SubscriptionData::getTopic).collect(Collectors.toSet()));
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}

for (SubscriptionData subscriptionData : subList) {
Set<String> groups = this.topicGroupTable.computeIfAbsent(subscriptionData.getTopic(), k -> ConcurrentHashMap.newKeySet());
groups.add(group);
}

boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
if (r1) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_REGISTER, group, clientChannelInfo,
subList.stream().map(SubscriptionData::getTopic).collect(Collectors.toSet()));
}
boolean r2 = false;
if (updateSubscription) {
r2 = consumerGroupInfo.updateSubscription(subList);
}

if (r1 || r2) {
if (isNotifyConsumerIdsChangedEnable) {
if (isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}

if (this.brokerConfig != null && this.brokerConfig.isEnableFastChannelEventProcess() && r1) {
ClientChannelAttributeHelper.addConsumerGroup(clientChannelInfo.getChannel(), group);
}

if (null != this.brokerStatsManager) {
this.brokerStatsManager.incConsumerRegisterTime((int) (System.currentTimeMillis() - start));
}
Expand All @@ -216,8 +280,14 @@ public boolean registerConsumerWithoutSub(final String group, final ClientChanne
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
consumerGroupInfo = prev != null ? prev : tmp;
}

for (SubscriptionData subscriptionData : consumerGroupInfo.getSubscriptionTable().values()) {
Set<String> groups = this.topicGroupTable.computeIfAbsent(subscriptionData.getTopic(), k -> ConcurrentHashMap.newKeySet());
groups.add(group);
}

boolean updateChannelRst = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);
if (updateChannelRst && isNotifyConsumerIdsChangedEnable) {
if (updateChannelRst && isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
if (null != this.brokerStatsManager) {
Expand All @@ -237,12 +307,13 @@ public void unregisterConsumer(final String group, final ClientChannelInfo clien
if (consumerGroupInfo.getChannelInfoTable().isEmpty()) {
ConsumerGroupInfo remove = this.consumerTable.remove(group);
if (remove != null) {
LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}", group);
LOGGER.info("unregister consumer ok, no any connection, and remove consumer group, {}, {}", group, remove);

callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
clearTopicGroupTable(remove);
}
}
if (isNotifyConsumerIdsChangedEnable) {
if (isNotifyConsumerIdsChangedEnable && !isBroadcastMode(consumerGroupInfo.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
}
}
Expand Down Expand Up @@ -306,17 +377,7 @@ public void scanNotActiveChannel() {
}

public HashSet<String> queryTopicConsumeByWho(final String topic) {
HashSet<String> groups = new HashSet<>();
Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
while (it.hasNext()) {
Entry<String, ConsumerGroupInfo> entry = it.next();
ConcurrentMap<String, SubscriptionData> subscriptionTable =
entry.getValue().getSubscriptionTable();
if (subscriptionTable.containsKey(topic)) {
groups.add(entry.getKey());
}
}
return groups;
return new HashSet<>(Optional.ofNullable(topicGroupTable.get(topic)).orElseGet(HashSet::new));
}

public void appendConsumerIdsChangeListener(ConsumerIdsChangeListener listener) {
Expand All @@ -332,4 +393,8 @@ protected void callConsumerIdsChangeListener(ConsumerGroupEvent event, String gr
}
}
}

private boolean isBroadcastMode(final MessageModel messageModel) {
return MessageModel.BROADCASTING.equals(messageModel);
}
}