From d7eb54f5effc3528f14341736ba402d17b300f54 Mon Sep 17 00:00:00 2001 From: imzs Date: Wed, 18 Mar 2026 18:04:12 +0800 Subject: [PATCH] Use RocksDB CQ for LMQ within CombineConsumeQueueStore #10173 --- .../rocketmq/broker/BrokerController.java | 10 +- .../lite/AbstractLiteLifecycleManager.java | 3 +- .../lite/RocksDBLiteLifecycleManager.java | 26 +- .../processor/LiteManagerProcessor.java | 10 +- .../rocketmq/broker/lite/LiteTestUtil.java | 23 +- .../lite/RocksDBLiteLifecycleManagerTest.java | 35 ++- .../store/config/MessageStoreConfig.java | 19 ++ .../store/config/StorePathConfigHelper.java | 4 + .../store/queue/CombineConsumeQueueStore.java | 69 +++++- .../store/queue/MultiDispatchUtils.java | 3 + .../store/queue/RocksDBConsumeQueueStore.java | 14 +- .../queue/CombineConsumeQueueStoreTest.java | 131 ++++++++++ .../queue/RocksDBConsumeQueueStoreTest.java | 224 ++++++++++++++++++ .../test/base/IntegrationTestBase.java | 5 + 14 files changed, 543 insertions(+), 33 deletions(-) create mode 100644 store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStoreTest.java diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 2734e8b2463..8e2954d8ff0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -396,7 +396,7 @@ public BrokerController( this.authorizationMetadataManager = AuthorizationFactory.getMetadataManager(this.authConfig); this.topicRouteInfoManager = new TopicRouteInfoManager(this); this.liteSharding = new LiteShardingImpl(this, this.topicRouteInfoManager); - this.liteLifecycleManager = this.messageStoreConfig.isEnableRocksDBStore() ? + this.liteLifecycleManager = this.messageStoreConfig.isEnableRocksDBStore() || this.messageStoreConfig.isRocksdbCQDoubleWriteEnable() ? new RocksDBLiteLifecycleManager(this, this.liteSharding) : new LiteLifecycleManager(this, this.liteSharding); this.liteSubscriptionRegistry = new LiteSubscriptionRegistryImpl(this, liteLifecycleManager); this.liteSubscriptionCtlProcessor = new LiteSubscriptionCtlProcessor(this, liteSubscriptionRegistry); @@ -951,6 +951,8 @@ public boolean recoverAndInitService() throws CloneNotSupportedException { //scheduleMessageService load after messageStore load success result = result && this.scheduleMessageService.load(); + result = result && initLiteService(); + for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) { if (brokerAttachedPlugin != null) { result = result && brokerAttachedPlugin.load(); @@ -975,8 +977,6 @@ public boolean recoverAndInitService() throws CloneNotSupportedException { initialRequestPipeline(); - initLiteService(); - if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { @@ -1153,9 +1153,9 @@ private void initialRequestPipeline() { } } - private void initLiteService() { + private boolean initLiteService() { this.liteEventDispatcher.init(); - this.liteLifecycleManager.init(); + return this.liteLifecycleManager.init(); } public void registerProcessor() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java b/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java index e8fb2bde4d0..44a85a30bea 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java @@ -55,9 +55,10 @@ public AbstractLiteLifecycleManager(BrokerController brokerController, LiteShard this.liteSharding = liteSharding; } - public void init() { + public boolean init() { this.messageStore = brokerController.getMessageStore(); assert messageStore != null; + return true; } /** diff --git a/broker/src/main/java/org/apache/rocketmq/broker/lite/RocksDBLiteLifecycleManager.java b/broker/src/main/java/org/apache/rocketmq/broker/lite/RocksDBLiteLifecycleManager.java index fb0eb51540c..d8855021f21 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/lite/RocksDBLiteLifecycleManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/lite/RocksDBLiteLifecycleManager.java @@ -26,6 +26,7 @@ import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.store.RocksDBMessageStore; +import org.apache.rocketmq.store.queue.CombineConsumeQueueStore; import org.apache.rocketmq.store.queue.RocksDBConsumeQueueOffsetTable; import org.apache.rocketmq.store.queue.RocksDBConsumeQueueStore; import org.apache.rocketmq.tieredstore.TieredMessageStore; @@ -89,17 +90,30 @@ public List> collectExpiredLiteTopic() { } @Override - public void init() { + public boolean init() { super.init(); if (messageStore instanceof TieredMessageStore) { // only support TieredMessageStore plugin messageStore = ((TieredMessageStore) messageStore).getDefaultStore(); } - if (!(messageStore instanceof RocksDBMessageStore)) { - LOGGER.warn("init failed, not a RocksDB store. {}", messageStore.getClass()); - return; // startup with lite feature disabled + + RocksDBConsumeQueueStore queueStore; // underlay rocksdb consume queue store + if (messageStore instanceof RocksDBMessageStore) { // storeType = defaultRocksDB + queueStore = (RocksDBConsumeQueueStore) messageStore.getQueueStore(); + } else { // storeType = default && double write enable + if (!(messageStore.getQueueStore() instanceof CombineConsumeQueueStore)) { + LOGGER.warn("unexpected, not a CombineConsumeQueueStore. {}", messageStore.getQueueStore().getClass()); + return false; // abort startup + } + CombineConsumeQueueStore combineConsumeQueueStore = (CombineConsumeQueueStore) messageStore.getQueueStore(); + queueStore = combineConsumeQueueStore.getRocksDBConsumeQueueStore(); + if (!messageStore.getMessageStoreConfig().isCombineCQUseRocksdbForLmq() || null == queueStore) { + LOGGER.warn("unexpected, rocksdbCQ is not ready for LMQ."); + return false; // abort startup + } + LOGGER.info("LiteLifecycleManager init with CombineConsumeQueueStore."); } + try { - RocksDBConsumeQueueStore queueStore = (RocksDBConsumeQueueStore) messageStore.getQueueStore(); RocksDBConsumeQueueOffsetTable cqOffsetTable = (RocksDBConsumeQueueOffsetTable) FieldUtils.readField( FieldUtils.getField(RocksDBConsumeQueueStore.class, "rocksDBConsumeQueueOffsetTable", true), queueStore); @SuppressWarnings("unchecked") @@ -108,6 +122,8 @@ public void init() { maxCqOffsetTable = Collections.unmodifiableMap(innerMaxCqOffsetTable); } catch (Exception e) { LOGGER.error("LiteLifecycleManager-init error", e); + return false; } + return true; } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java index ac12983d61e..befc16e2716 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java @@ -54,6 +54,8 @@ import org.apache.rocketmq.remoting.protocol.header.GetParentTopicInfoRequestHeader; import org.apache.rocketmq.remoting.protocol.header.TriggerLiteDispatchRequestHeader; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.store.queue.CombineConsumeQueueStore; +import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface; import java.util.HashSet; import java.util.Map; @@ -106,12 +108,18 @@ protected RemotingCommand getBrokerLiteInfo(ChannelHandlerContext ctx, body.setCurrentLmqNum(brokerController.getMessageStore().getQueueStore().getLmqNum()); body.setLiteSubscriptionCount(brokerController.getLiteSubscriptionRegistry().getActiveSubscriptionNum()); body.setOrderInfoCount(brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().getOrderInfoCount()); - body.setCqTableSize(brokerController.getMessageStore().getQueueStore().getConsumeQueueTable().size()); body.setOffsetTableSize(brokerController.getConsumerOffsetManager().getOffsetTable().size()); body.setEventMapSize(brokerController.getLiteEventDispatcher().getEventMapSize()); body.setTopicMeta(LiteMetadataUtil.getTopicTtlMap(brokerController)); body.setGroupMeta(LiteMetadataUtil.getSubscriberGroupMap(brokerController)); + ConsumeQueueStoreInterface consumeQueueStore = brokerController.getMessageStore().getQueueStore(); + if (consumeQueueStore instanceof CombineConsumeQueueStore + && brokerController.getMessageStoreConfig().isCombineCQUseRocksdbForLmq()) { + consumeQueueStore = ((CombineConsumeQueueStore) consumeQueueStore).getRocksDBConsumeQueueStore(); // not null + } + body.setCqTableSize(consumeQueueStore.getConsumeQueueTable().size()); + response.setBody(body.encode()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteTestUtil.java b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteTestUtil.java index eabc5ea3f0d..ec6efb1fd54 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteTestUtil.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteTestUtil.java @@ -37,6 +37,20 @@ public class LiteTestUtil { + public static MessageStore buildMessageStore(final BrokerConfig brokerConfig, + MessageStoreConfig storeConfig, final ConcurrentMap topicConfigTable, + boolean isRocksDBStore) throws Exception { + + BrokerStatsManager brokerStatsManager = new BrokerStatsManager(brokerConfig); + MessageStore messageStore; + if (isRocksDBStore) { + messageStore = new RocksDBMessageStore(storeConfig, brokerStatsManager, null, brokerConfig, topicConfigTable); + } else { + messageStore = new DefaultMessageStore(storeConfig, brokerStatsManager, null, brokerConfig, topicConfigTable); + } + return messageStore; + } + public static MessageStore buildMessageStore(String storePathRootDir, final BrokerConfig brokerConfig, final ConcurrentMap topicConfigTable, boolean isRocksDBStore) throws Exception { MessageStoreConfig storeConfig = new MessageStoreConfig(); @@ -51,14 +65,7 @@ public static MessageStore buildMessageStore(String storePathRootDir, final Brok storeConfig.setEnableMultiDispatch(true); storeConfig.setStorePathRootDir(storePathRootDir); - BrokerStatsManager brokerStatsManager = new BrokerStatsManager(brokerConfig); - MessageStore messageStore; - if (isRocksDBStore) { - messageStore = new RocksDBMessageStore(storeConfig, brokerStatsManager, null, brokerConfig, topicConfigTable); - } else { - messageStore = new DefaultMessageStore(storeConfig, brokerStatsManager, null, brokerConfig, topicConfigTable); - } - return messageStore; + return buildMessageStore(brokerConfig, storeConfig, topicConfigTable, isRocksDBStore); } public static MessageExtBrokerInner buildMessage(String parentTopic, String liteTopic) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/lite/RocksDBLiteLifecycleManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/lite/RocksDBLiteLifecycleManagerTest.java index 90b4e47f6a3..47db902ebce 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/lite/RocksDBLiteLifecycleManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/lite/RocksDBLiteLifecycleManagerTest.java @@ -28,8 +28,10 @@ import org.apache.rocketmq.common.attribute.TopicMessageType; import org.apache.rocketmq.common.lite.LiteUtil; import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore; import org.apache.rocketmq.store.plugin.MessageStorePluginContext; +import org.apache.rocketmq.store.queue.AbstractConsumeQueueStore; import org.apache.rocketmq.tieredstore.TieredMessageStore; import org.junit.AfterClass; import org.junit.Assert; @@ -125,9 +127,11 @@ public void testInit_otherStore() { when(brokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG); when(brokerController.getMessageStore()).thenReturn(pluginMessageStore); + when(pluginMessageStore.getQueueStore()).thenReturn(Mockito.mock(AbstractConsumeQueueStore.class)); RocksDBLiteLifecycleManager manager = new RocksDBLiteLifecycleManager(brokerController, liteSharding); - manager.init(); + + Assert.assertFalse(manager.init()); Assert.assertThrows(NullPointerException.class, () -> manager.getMaxOffsetInQueue("HW")); } @@ -239,4 +243,33 @@ public void testCleanByParentTopic() throws Exception { Assert.assertEquals(0, liteLifecycleManager.getMaxOffsetInQueue(lmqName)); } } + + @Test + public void testInit_combineConsumeQueueStore() throws Exception { + MessageStoreConfig storeConfig = new MessageStoreConfig(); + storeConfig.setStorePathRootDir( + System.getProperty("java.io.tmpdir") + File.separator + "store-rocksDBLifecycleTest-" + UUID.randomUUID()); + storeConfig.setRocksdbCQDoubleWriteEnable(true); + MessageStore messageStore = LiteTestUtil.buildMessageStore(BROKER_CONFIG, storeConfig, TOPIC_CONFIG_TABLE, false); + BrokerController brokerController = Mockito.mock(BrokerController.class); + LiteSharding liteSharding = Mockito.mock(LiteSharding.class); + when(brokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG); + when(brokerController.getMessageStore()).thenReturn(messageStore); + + // enable + storeConfig.setCombineCQUseRocksdbForLmq(true); + RocksDBLiteLifecycleManager manager = new RocksDBLiteLifecycleManager(brokerController, liteSharding); + Assert.assertTrue(manager.init()); + Assert.assertEquals(0, manager.getMaxOffsetInQueue(UUID.randomUUID().toString())); + + // disable + storeConfig.setCombineCQUseRocksdbForLmq(false); + RocksDBLiteLifecycleManager manager2 = new RocksDBLiteLifecycleManager(brokerController, liteSharding); + Assert.assertFalse(manager2.init()); + Assert.assertThrows(NullPointerException.class, () -> manager2.getMaxOffsetInQueue("HW")); + + messageStore.shutdown(); + messageStore.destroy(); + UtilAll.deleteFile(new File(storeConfig.getStorePathRootDir())); + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java index b6624daffbc..bff35d473c9 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -484,6 +484,7 @@ public class MessageStoreConfig { private String combineAssignOffsetCQType = StoreType.DEFAULT.getStoreType(); private boolean combineCQEnableCheckSelf = false; private int combineCQMaxExtraSearchCommitLogFiles = 3; + private boolean combineCQUseRocksdbForLmq = false; /** * If ConsumeQueueStore is RocksDB based, this option is to configure bottom-most tier compression type. @@ -520,6 +521,8 @@ public class MessageStoreConfig { // Shared byte buffer manager configuration private int sharedByteBufferNum = 16; + private boolean useSeparateStorePathForRocksdbCQ = false; + public String getRocksdbCompressionType() { return rocksdbCompressionType; } @@ -2110,6 +2113,14 @@ public void setCombineCQMaxExtraSearchCommitLogFiles(int combineCQMaxExtraSearch this.combineCQMaxExtraSearchCommitLogFiles = combineCQMaxExtraSearchCommitLogFiles; } + public boolean isCombineCQUseRocksdbForLmq() { + return combineCQUseRocksdbForLmq; + } + + public void setCombineCQUseRocksdbForLmq(boolean combineCQUseRocksdbForLmq) { + this.combineCQUseRocksdbForLmq = combineCQUseRocksdbForLmq; + } + public boolean isEnableLogConsumeQueueRepeatedlyBuildWhenRecover() { return enableLogConsumeQueueRepeatedlyBuildWhenRecover; } @@ -2302,4 +2313,12 @@ public boolean isAppendTopicForTimerDeleteKey() { public void setAppendTopicForTimerDeleteKey(boolean appendTopicForTimerDeleteKey) { this.appendTopicForTimerDeleteKey = appendTopicForTimerDeleteKey; } + + public boolean isUseSeparateStorePathForRocksdbCQ() { + return useSeparateStorePathForRocksdbCQ; + } + + public void setUseSeparateStorePathForRocksdbCQ(boolean useSeparateStorePathForRocksdbCQ) { + this.useSeparateStorePathForRocksdbCQ = useSeparateStorePathForRocksdbCQ; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java index 2f34e7dff54..78b6ed6ddcb 100644 --- a/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java +++ b/store/src/main/java/org/apache/rocketmq/store/config/StorePathConfigHelper.java @@ -31,6 +31,10 @@ public static String getStorePathBatchConsumeQueue(final String rootDir) { return rootDir + File.separator + "batchconsumequeue"; } + public static String getStorePathRocksDBConsumeQueue(final String rootDir) { + return rootDir + File.separator + "consumequeue_r"; + } + public static String getStorePathIndex(final String rootDir) { return rootDir + File.separator + "index"; } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java index 12b87d34740..f266f9d57a7 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java @@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.CheckRocksdbCqWriteResult; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.common.Pair; @@ -116,8 +117,17 @@ public CombineConsumeQueueStore(DefaultMessageStore messageStore) { throw new IllegalArgumentException("CombineConsumeQueue choosePreferCQ fail"); } - log.info("CombineConsumeQueueStore init, consumeQueueStoreList={}, currentReadStore={}, assignOffsetStore={}", - innerConsumeQueueStoreList, currentReadStore.getClass().getSimpleName(), assignOffsetStore.getClass().getSimpleName()); + if (messageStoreConfig.isCombineCQUseRocksdbForLmq() && null == rocksDBConsumeQueueStore) { + throw new IllegalArgumentException("CombineConsumeQueueStore rocksdbCQ is not ready for LMQ"); + } + + if (messageStoreConfig.isCombineCQUseRocksdbForLmq() && assignOffsetStore != consumeQueueStore) { + throw new IllegalArgumentException("CombineConsumeQueueStore maybe incorrect config"); + } + + log.info("CombineConsumeQueueStore init, consumeQueueStoreList={}, currentReadStore={}, assignOffsetStore={}, combineCQUseRocksdbForLmq={}", + innerConsumeQueueStoreList, currentReadStore.getClass().getSimpleName(), + assignOffsetStore.getClass().getSimpleName(), messageStoreConfig.isCombineCQUseRocksdbForLmq()); } @Override @@ -149,9 +159,21 @@ public boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp, } for (AbstractConsumeQueueStore store : innerConsumeQueueStoreList) { - if (store == assignOffsetStore || store.isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally)) { + if (store == assignOffsetStore) { + continue; + } + if (store.getMaxPhyOffsetInConsumeQueue() <= 0) { + log.warn("CombineConsumeQueueStore, the store hasn't started before, skip it, store={}", + store.getClass().getSimpleName()); + continue; + } + if (store.isMappedFileMatchedRecover(phyOffset, storeTimestamp, recoverNormally)) { continue; } + if (messageStoreConfig.isCombineCQUseRocksdbForLmq() && store instanceof RocksDBConsumeQueueStore) { + // rocksDBConsumeQueueStore acts as 'assignOffsetStore' for lmq, make sure it can be fully recovered + return false; + } // if other store is not matched for fully recovery, extraSearchCommitLogFilesForRecovery will minus 1 if (extraSearchCommitLogFilesForRecovery.getAndDecrement() <= 0) { // extraSearchCommitLogFilesForRecovery <= 0, only can read from assignOffsetStore @@ -160,7 +182,7 @@ public boolean isMappedFileMatchedRecover(long phyOffset, long storeTimestamp, assignOffsetStore.getClass().getSimpleName(), currentReadStore.getClass().getSimpleName()); throw new IllegalArgumentException(store.getClass().getSimpleName() + " not satisfied readable conditions, only can read from " + assignOffsetStore.getClass().getSimpleName()); } - log.warn("CombineConsumeQueueStore can not recover all inner store, maybe some inner store start haven’t started before, store={}", + log.warn("CombineConsumeQueueStore can not recover all inner store, maybe some inner stores haven’t started before, store={}", store.getClass().getSimpleName()); return true; } else { @@ -234,6 +256,11 @@ public boolean verifyAndInitOffsetForAllStore(boolean initializeOffset) throws R } if (maxOffset0 > 0) { + if (messageStoreConfig.isCombineCQUseRocksdbForLmq() && MixAll.isLmq(topic)) { + log.warn("CombineConsumeQueueStore checkAssignOffsetStore, LMQ offset not match. topic={}, maxOffsetInAssign={}, otherCQ={}, maxOffset0={}", + topic, maxOffsetInAssign, abstractConsumeQueueStore.getClass().getSimpleName(), maxOffset0); + continue; + } log.error("CombineConsumeQueueStore checkAssignOffsetStore fail, topic={}, queueId={}, maxOffsetInAssign={}, otherCQ={}, maxOffset0={}", topic, queueId, maxOffsetInAssign, abstractConsumeQueueStore.getClass().getSimpleName(), maxOffset0); result = false; @@ -341,12 +368,12 @@ public void increaseQueueOffset(MessageExtBrokerInner msg, short messageNum) { @Override public void increaseLmqOffset(String topic, int queueId, short delta) throws ConsumeQueueException { - assignOffsetStore.increaseLmqOffset(topic, queueId, delta); + getAssignOffsetStoreForTopic(topic).increaseLmqOffset(topic, queueId, delta); } @Override public long getLmqQueueOffset(String topic, int queueId) throws ConsumeQueueException { - return assignOffsetStore.getLmqQueueOffset(topic, queueId); + return getAssignOffsetStoreForTopic(topic).getLmqQueueOffset(topic, queueId); } @Override @@ -358,28 +385,28 @@ public void recoverOffsetTable(long minPhyOffset) { @Override public Long getMaxOffset(String topic, int queueId) throws ConsumeQueueException { - return currentReadStore.getMaxOffset(topic, queueId); + return getCurrentReadStoreForTopic(topic).getMaxOffset(topic, queueId); } @Override public long getMinOffsetInQueue(String topic, int queueId) throws RocksDBException { - return currentReadStore.getMinOffsetInQueue(topic, queueId); + return getCurrentReadStoreForTopic(topic).getMinOffsetInQueue(topic, queueId); } @Override public long getOffsetInQueueByTime(String topic, int queueId, long timestamp, BoundaryType boundaryType) throws RocksDBException { - return currentReadStore.getOffsetInQueueByTime(topic, queueId, timestamp, boundaryType); + return getCurrentReadStoreForTopic(topic).getOffsetInQueueByTime(topic, queueId, timestamp, boundaryType); } @Override public ConsumeQueueInterface findOrCreateConsumeQueue(String topic, int queueId) { - return currentReadStore.findOrCreateConsumeQueue(topic, queueId); + return getCurrentReadStoreForTopic(topic).findOrCreateConsumeQueue(topic, queueId); } @Override public ConsumeQueueInterface getConsumeQueue(String topic, int queueId) { - return currentReadStore.getConsumeQueue(topic, queueId); + return getCurrentReadStoreForTopic(topic).getConsumeQueue(topic, queueId); } @Override @@ -393,11 +420,17 @@ public long getTotalSize() { @Override public int getLmqNum() { + if (messageStoreConfig.isCombineCQUseRocksdbForLmq()) { + return rocksDBConsumeQueueStore.getLmqNum(); + } return currentReadStore.getLmqNum(); } @Override public boolean isLmqExist(String lmqTopic) { + if (messageStoreConfig.isCombineCQUseRocksdbForLmq()) { + return rocksDBConsumeQueueStore.isLmqExist(lmqTopic); + } return currentReadStore.isLmqExist(lmqTopic); } @@ -554,4 +587,18 @@ private AbstractConsumeQueueStore getInnerStoreByStoreType(StoreType storeType) return null; } } + + private AbstractConsumeQueueStore getAssignOffsetStoreForTopic(String topic) { + if (messageStoreConfig.isCombineCQUseRocksdbForLmq() && MixAll.isLmq(topic)) { + return rocksDBConsumeQueueStore; + } + return assignOffsetStore; + } + + private AbstractConsumeQueueStore getCurrentReadStoreForTopic(String topic) { + if (messageStoreConfig.isCombineCQUseRocksdbForLmq() && MixAll.isLmq(topic)) { + return rocksDBConsumeQueueStore; + } + return currentReadStore; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java index 44397a2fce1..7edbf52490d 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/MultiDispatchUtils.java @@ -47,6 +47,9 @@ public static boolean checkMultiDispatchQueue(MessageStoreConfig messageStoreCon if (!isNeedHandleMultiDispatch(messageStoreConfig, dispatchRequest.getTopic())) { return false; } + if (messageStoreConfig.isRocksdbCQDoubleWriteEnable() && messageStoreConfig.isCombineCQUseRocksdbForLmq()) { + return false; // no need to dispatch file CQ here + } Map prop = dispatchRequest.getPropertiesMap(); if (prop == null || prop.isEmpty()) { return false; diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java index 48e9e60277a..8573ae81472 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java @@ -106,7 +106,19 @@ public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) { super(messageStore); messageStore.setNotifyMessageArriveInBatch(true); - this.storePath = StorePathConfigHelper.getStorePathConsumeQueue(messageStoreConfig.getStorePathRootDir()); + String root = messageStoreConfig.getStorePathRootDir(); + File checkFile; + if (messageStoreConfig.isUseSeparateStorePathForRocksdbCQ()) { + this.storePath = StorePathConfigHelper.getStorePathRocksDBConsumeQueue(root); + checkFile = new File(StorePathConfigHelper.getStorePathConsumeQueue(root) + File.separator + "CURRENT"); + } else { + this.storePath = StorePathConfigHelper.getStorePathConsumeQueue(root); + checkFile = new File(StorePathConfigHelper.getStorePathRocksDBConsumeQueue(root) + File.separator + "CURRENT"); + } + if (checkFile.isFile()) { // probably used rocksdb in original/separate path + throw new IllegalStateException("find RocksDBConsumeQueue in original/separate path, maybe incompatible config."); + } + this.rocksDBStorage = new ConsumeQueueRocksDBStorage(messageStore, storePath); this.rocksDBConsumeQueueTable = new RocksDBConsumeQueueTable(rocksDBStorage, messageStore); this.rocksDBConsumeQueueOffsetTable = new RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, messageStore); diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java index 2ca21b265ef..e7ac763a181 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java @@ -19,19 +19,24 @@ import java.io.File; import java.net.InetSocketAddress; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.attribute.CQType; import org.apache.rocketmq.common.constant.PermName; +import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageExtBrokerInner; import org.apache.rocketmq.store.ConsumeQueue; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.DispatchRequest; +import org.apache.rocketmq.store.StoreType; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.junit.After; import org.junit.Assert; @@ -82,6 +87,24 @@ public void CombineConsumeQueueStore_EmptyLoadingCQTypes_ThrowsException() throw new CombineConsumeQueueStore(messageStore); } + @Test(expected = IllegalArgumentException.class) + public void CombineConsumeQueueStore_LoadingCQTypesNotContainsRocksdb_ThrowsException() throws Exception { + messageStore = (DefaultMessageStore) createMessageStore(null, false, topicConfigTableMap, messageStoreConfig); + + messageStoreConfig.setCombineCQLoadingCQTypes(StoreType.DEFAULT.getStoreType()); + messageStoreConfig.setCombineCQUseRocksdbForLmq(true); + new CombineConsumeQueueStore(messageStore); + } + + @Test(expected = IllegalArgumentException.class) + public void CombineConsumeQueueStore_assignOffsetStoreIsRocksdb_ThrowsException() throws Exception { + messageStore = (DefaultMessageStore) createMessageStore(null, false, topicConfigTableMap, messageStoreConfig); + + messageStoreConfig.setCombineCQUseRocksdbForLmq(true); + messageStoreConfig.setCombineAssignOffsetCQType(StoreType.DEFAULT_ROCKSDB.getStoreType()); + new CombineConsumeQueueStore(messageStore); + } + @Test public void CombineConsumeQueueStore_InitializesConsumeQueueStore() throws Exception { messageStore = (DefaultMessageStore) createMessageStore(null, false, topicConfigTableMap, messageStoreConfig); @@ -156,6 +179,49 @@ public void testIterator() throws Exception { }); } + @Test + public void testIterator_combineCQUseRocksdbForLmq() throws Exception { + messageStoreConfig.setRocksdbCQDoubleWriteEnable(true); + messageStoreConfig.setCombineCQUseRocksdbForLmq(true); + messageStoreConfig.setEnableLmq(true); + messageStoreConfig.setEnableMultiDispatch(true); + messageStore = (DefaultMessageStore) createMessageStore(null, false, topicConfigTableMap, messageStoreConfig); + messageStore.load(); + messageStore.start(); + + String lmqName = MixAll.LMQ_PREFIX + UUID.randomUUID(); + Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(lmqName, queueId)); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(lmqName, queueId)); + + ConsumeQueueInterface consumeQueue = messageStore.getConsumeQueue(lmqName, queueId); + Assert.assertEquals(CQType.RocksDBCQ, consumeQueue.getCQType()); + Assert.assertEquals(0, consumeQueue.getMaxOffsetInQueue()); + Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue()); + Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(lmqName, queueId)); + Assert.assertEquals(0, messageStore.getMinOffsetInQueue(lmqName, queueId)); + + for (int i = 0; i < msgNum; i++) { + Map propertyMap = new HashMap<>(); + propertyMap.put(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, lmqName); + propertyMap.put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, String.valueOf(i)); + DispatchRequest request = new DispatchRequest(topic, queueId, i * msgSize, msgSize, i, + System.currentTimeMillis(), i, null, null, 0, 0, propertyMap); + messageStore.getQueueStore().putMessagePositionInfoWrapper(request); + } + + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + checkCQ(consumeQueue, msgNum, msgSize); + + CombineConsumeQueueStore combineConsumeQueueStore = (CombineConsumeQueueStore) messageStore.getQueueStore(); + ConsumeQueueInterface rocksDBConsumeQueue = combineConsumeQueueStore.getRocksDBConsumeQueueStore().getConsumeQueue(lmqName, queueId); + ConsumeQueueInterface fileConsumeQueue = combineConsumeQueueStore.getConsumeQueueStore().getConsumeQueue(lmqName, queueId); + + Assert.assertEquals(consumeQueue, rocksDBConsumeQueue); + Assert.assertNull(fileConsumeQueue); // not exist in file CQ store + Assert.assertEquals(msgNum, rocksDBConsumeQueue.getMaxOffsetInQueue()); + }); + } + private void checkCQ(ConsumeQueueInterface consumeQueue, int msgNum, int msgSize) { Assert.assertEquals(0, consumeQueue.getMinLogicOffset()); @@ -356,4 +422,69 @@ public void testVerifyAndInitOffsetForAllStore() throws Exception { messageStore.shutdown(); } } + + @Test + public void testLmqOffset_combineCQUseRocksdbForLmq() throws Exception { + messageStoreConfig.setRocksdbCQDoubleWriteEnable(true); + messageStoreConfig.setCombineCQUseRocksdbForLmq(true); + messageStoreConfig.setEnableLmq(true); + messageStoreConfig.setEnableMultiDispatch(true); + messageStore = (DefaultMessageStore) createMessageStore(null, false, topicConfigTableMap, messageStoreConfig); + messageStore.load(); + messageStore.start(); + CombineConsumeQueueStore combineConsumeQueueStore = (CombineConsumeQueueStore) messageStore.getQueueStore(); + ConsumeQueueStore consumeQueueStore = combineConsumeQueueStore.getConsumeQueueStore(); + RocksDBConsumeQueueStore rocksDBConsumeQueueStore = combineConsumeQueueStore.getRocksDBConsumeQueueStore(); + + String lmqName = MixAll.LMQ_PREFIX + UUID.randomUUID(); + Assert.assertEquals(0, combineConsumeQueueStore.getLmqQueueOffset(lmqName, queueId)); + Assert.assertEquals(0, consumeQueueStore.getLmqQueueOffset(lmqName, queueId)); + Assert.assertEquals(0, rocksDBConsumeQueueStore.getLmqQueueOffset(lmqName, queueId)); + + combineConsumeQueueStore.increaseLmqOffset(lmqName, queueId, (short) 100); + Assert.assertEquals(100, combineConsumeQueueStore.getLmqQueueOffset(lmqName, queueId)); + Assert.assertEquals(0, consumeQueueStore.getLmqQueueOffset(lmqName, queueId)); + Assert.assertEquals(100, rocksDBConsumeQueueStore.getLmqQueueOffset(lmqName, queueId)); + } + + @Test + public void testLmqNum_combineCQUseRocksdbForLmq() throws Exception { + messageStoreConfig.setRocksdbCQDoubleWriteEnable(true); + messageStoreConfig.setCombineCQUseRocksdbForLmq(true); + messageStoreConfig.setEnableLmq(true); + messageStoreConfig.setEnableMultiDispatch(true); + messageStore = (DefaultMessageStore) createMessageStore(null, false, topicConfigTableMap, messageStoreConfig); + messageStore.load(); + messageStore.start(); + CombineConsumeQueueStore combineConsumeQueueStore = (CombineConsumeQueueStore) messageStore.getQueueStore(); + ConsumeQueueStore consumeQueueStore = combineConsumeQueueStore.getConsumeQueueStore(); + RocksDBConsumeQueueStore rocksDBConsumeQueueStore = combineConsumeQueueStore.getRocksDBConsumeQueueStore(); + + String lmqName = MixAll.LMQ_PREFIX + UUID.randomUUID(); + Assert.assertEquals(0, combineConsumeQueueStore.getLmqNum()); + Assert.assertEquals(0, rocksDBConsumeQueueStore.getLmqNum()); + Assert.assertEquals(0, consumeQueueStore.getLmqNum()); + Assert.assertFalse(combineConsumeQueueStore.isLmqExist(lmqName)); + Assert.assertFalse(rocksDBConsumeQueueStore.isLmqExist(lmqName)); + Assert.assertFalse(consumeQueueStore.isLmqExist(lmqName)); + + for (int i = 0; i < msgNum; i++) { + Map propertyMap = new HashMap<>(); + propertyMap.put(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, lmqName); + propertyMap.put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, String.valueOf(i)); + DispatchRequest request = new DispatchRequest(topic, queueId, i * msgSize, msgSize, i, + System.currentTimeMillis(), i, null, null, 0, 0, propertyMap); + messageStore.getQueueStore().putMessagePositionInfoWrapper(request); + } + + await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + Assert.assertEquals(1, combineConsumeQueueStore.getLmqNum()); + Assert.assertEquals(1, rocksDBConsumeQueueStore.getLmqNum()); + Assert.assertEquals(0, consumeQueueStore.getLmqNum()); + + Assert.assertTrue(combineConsumeQueueStore.isLmqExist(lmqName)); + Assert.assertTrue(rocksDBConsumeQueueStore.isLmqExist(lmqName)); + Assert.assertFalse(consumeQueueStore.isLmqExist(lmqName)); + }); + } } diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStoreTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStoreTest.java new file mode 100644 index 00000000000..9431f7ed048 --- /dev/null +++ b/store/src/test/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStoreTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.store.queue; + +import java.io.File; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.attribute.CQType; +import org.apache.rocketmq.common.TopicConfig; +import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageExtBrokerInner; +import org.apache.rocketmq.store.DispatchRequest; +import org.apache.rocketmq.store.LmqDispatch; +import org.apache.rocketmq.store.MessageStore; +import org.apache.rocketmq.store.StoreType; +import org.apache.rocketmq.store.config.MessageStoreConfig; +import org.apache.rocketmq.store.config.StorePathConfigHelper; +import org.apache.rocketmq.store.exception.ConsumeQueueException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertThrows; + +public class RocksDBConsumeQueueStoreTest extends QueueTestBase { + + private MessageStore messageStore; + private ConcurrentMap topicConfigTableMap; + + @Before + public void init() throws Exception { + MessageStoreConfig storeConfig = new MessageStoreConfig(); + storeConfig.setStoreType(StoreType.DEFAULT_ROCKSDB.getStoreType()); + storeConfig.setEnableCompaction(false); + storeConfig.setEnableLmq(true); + storeConfig.setEnableMultiDispatch(true); + this.topicConfigTableMap = new ConcurrentHashMap<>(); + messageStore = createMessageStore(null, false, topicConfigTableMap, storeConfig); + messageStore.load(); + messageStore.start(); + } + + @After + public void destroy() { + messageStore.shutdown(); + messageStore.destroy(); + } + + @Test + public void testStorePath_correctConfig() { + String root = messageStore.getMessageStoreConfig().getStorePathRootDir(); + String originalPath = StorePathConfigHelper.getStorePathConsumeQueue(root); + File dir = new File(originalPath); + File checkFile = new File(StorePathConfigHelper.getStorePathRocksDBConsumeQueue(root) + File.separator + "CURRENT"); + assertTrue(dir.exists() || !checkFile.isFile()); + } + + @Test + public void testStorePath_incompatibleConfig() throws Exception { + MessageStoreConfig storeConfig = new MessageStoreConfig(); + storeConfig.setStoreType(StoreType.DEFAULT_ROCKSDB.getStoreType()); + storeConfig.setUseSeparateStorePathForRocksdbCQ(true); + storeConfig.setEnableCompaction(false); + this.topicConfigTableMap = new ConcurrentHashMap<>(); + + String root = createBaseDir(); + makeSureFileExists(StorePathConfigHelper.getStorePathConsumeQueue(root) + File.separator + "CURRENT"); + IllegalStateException exception = assertThrows(IllegalStateException.class, () -> + createMessageStore(root, false, topicConfigTableMap, storeConfig) + ); + assertTrue(exception.getMessage().contains("incompatible config")); + + storeConfig.setUseSeparateStorePathForRocksdbCQ(false); + String root2 = createBaseDir(); + makeSureFileExists(StorePathConfigHelper.getStorePathRocksDBConsumeQueue(root2) + File.separator + "CURRENT"); + exception = assertThrows(IllegalStateException.class, () -> + createMessageStore(root2, false, topicConfigTableMap, storeConfig) + ); + assertTrue(exception.getMessage().contains("incompatible config")); + } + + @Test + public void testFindOrCreateConsumeQueue() { + String topic = "test-topic-" + UUID.randomUUID(); + ConsumeQueueInterface cq = messageStore.getQueueStore().findOrCreateConsumeQueue(topic, 0); + assertNotNull(cq); + assertEquals(CQType.RocksDBCQ, cq.getCQType()); + } + + @Test + public void testPutMessagePositionInfoWrapper_basic() throws Exception { + String topic = "test-topic-" + UUID.randomUUID(); + int msgNum = 10; + int msgSize = 100; + int queueId = 0; + + for (int i = 0; i < msgNum; i++) { + DispatchRequest request = new DispatchRequest(topic, queueId, (long) i * msgSize, msgSize, i, + System.currentTimeMillis(), i, "key", "uk", 0, 0, null); + messageStore.getQueueStore().putMessagePositionInfoWrapper(request); + } + + RocksDBConsumeQueueStore store = (RocksDBConsumeQueueStore) messageStore.getQueueStore(); + await().atMost(5, SECONDS).untilAsserted(() -> + assertEquals(msgNum, store.getMaxOffsetInQueue(topic, queueId)) + ); + } + + @Test + public void testPutMessagePositionInfoWrapper_lmq() throws Exception { + String topic = "test-topic-" + UUID.randomUUID(); + int msgNum = 10; + int msgSize = 100; + int queueId = 0; + + String lmqName = MixAll.LMQ_PREFIX + UUID.randomUUID(); + for (int i = 0; i < msgNum; i++) { + Map propertyMap = new HashMap<>(); + propertyMap.put(MessageConst.PROPERTY_INNER_MULTI_DISPATCH, lmqName); + propertyMap.put(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET, String.valueOf(i)); + DispatchRequest request = new DispatchRequest(topic, queueId, (long) i * msgSize, msgSize, i, + System.currentTimeMillis(), i, "key", "uk", 0, 0, propertyMap); + messageStore.getQueueStore().putMessagePositionInfoWrapper(request); + } + + RocksDBConsumeQueueStore store = (RocksDBConsumeQueueStore) messageStore.getQueueStore(); + await().atMost(5, SECONDS).untilAsserted(() -> { + assertEquals(msgNum, store.getMaxOffsetInQueue(topic, queueId)); + assertTrue(store.isLmqExist(lmqName)); + assertEquals(msgNum, store.getMaxOffsetInQueue(lmqName, MixAll.LMQ_QUEUE_ID)); + }); + } + + @Test + public void testGetMaxOffset_emptyQueue() throws ConsumeQueueException { + String topic = "test-topic-" + UUID.randomUUID(); + long maxOffset = messageStore.getQueueStore().getMaxOffset(topic, 0); + assertEquals(0L, maxOffset); + } + + @Test + public void testGetMinOffsetInQueue_emptyQueue() throws Exception { + String topic = "test-topic-" + UUID.randomUUID(); + long minOffset = messageStore.getQueueStore().getMinOffsetInQueue(topic, 0); + assertEquals(0L, minOffset); + } + + @Test + public void testDeleteTopic() throws Exception { + RocksDBConsumeQueueStore store = (RocksDBConsumeQueueStore) messageStore.getQueueStore(); + String topic = "test-topic-" + UUID.randomUUID(); + String lmqName = MixAll.LMQ_PREFIX + UUID.randomUUID(); + + MessageExtBrokerInner msg = buildMessage(topic, -1); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_DISPATCH, lmqName); + LmqDispatch.wrapLmqDispatch(messageStore, msg); + messageStore.putMessage(msg); + + await().atMost(5, SECONDS).until(fullyDispatched(messageStore)); + assertEquals(1, store.getLmqNum()); + assertEquals(1, store.getMaxOffsetInQueue(topic, 0)); + assertEquals(1, store.getMaxOffsetInQueue(lmqName, 0)); + assertTrue(messageStore.getQueueStore().isLmqExist(lmqName)); + + messageStore.deleteTopics(java.util.Collections.singleton(topic)); + messageStore.deleteTopics(java.util.Collections.singleton(lmqName)); + assertEquals(0, messageStore.getQueueStore().getLmqNum()); + assertFalse(messageStore.getQueueStore().isLmqExist(lmqName)); + } + + @Test + public void testGetLmqNum_reload() throws Exception { + String topic = "test-topic-" + UUID.randomUUID(); + String lmqName = MixAll.LMQ_PREFIX + UUID.randomUUID(); + + MessageExtBrokerInner msg = buildMessage(topic, -1); + MessageAccessor.putProperty(msg, MessageConst.PROPERTY_INNER_MULTI_DISPATCH, lmqName); + LmqDispatch.wrapLmqDispatch(messageStore, msg); + messageStore.putMessage(msg); + + await().atMost(5, SECONDS).until(fullyDispatched(messageStore)); + assertEquals(1, messageStore.getQueueStore().getLmqNum()); + + String root = messageStore.getMessageStoreConfig().getStorePathRootDir(); + MessageStoreConfig config = messageStore.getMessageStoreConfig(); + messageStore.shutdown(); + + MessageStore reloadStore = createMessageStore(root, false, topicConfigTableMap, config); + reloadStore.load(); + reloadStore.start(); + + assertEquals(1, reloadStore.getQueueStore().getLmqNum()); + assertTrue(messageStore.getQueueStore().isLmqExist(lmqName)); + assertNull(reloadStore.getQueueStore().getConsumeQueueTable().get(lmqName)); + messageStore = reloadStore; + } +} \ No newline at end of file diff --git a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java index 4b623325258..bb78d39155c 100644 --- a/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java +++ b/test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java @@ -171,6 +171,11 @@ public static BrokerController createAndStartBroker(String nsAddr) { storeConfig.setEnableLmq(Boolean.valueOf(System.getProperty("enableLmq", "false"))); storeConfig.setEnableMultiDispatch(Boolean.valueOf(System.getProperty("enableMultiDispatch", "false"))); storeConfig.setStoreType(System.getProperty("storeType", "default")); + storeConfig.setRocksdbCQDoubleWriteEnable(Boolean.parseBoolean(System.getProperty("rocksdbCQDoubleWriteEnable", "false"))); + storeConfig.setCombineCQLoadingCQTypes(System.getProperty("combineCQLoadingCQTypes", "default")); + storeConfig.setCombineCQUseRocksdbForLmq(Boolean.parseBoolean(System.getProperty("combineCQUseRocksdbForLmq", "false"))); + storeConfig.setCombineAssignOffsetCQType(System.getProperty("combineAssignOffsetCQType", "default")); + storeConfig.setCombineCQPreferCQType(System.getProperty("combineCQPreferCQType", "default")); return createAndStartBroker(storeConfig, brokerConfig); }