Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ public BrokerController(
this.authorizationMetadataManager = AuthorizationFactory.getMetadataManager(this.authConfig);
this.topicRouteInfoManager = new TopicRouteInfoManager(this);
this.liteSharding = new LiteShardingImpl(this, this.topicRouteInfoManager);
this.liteLifecycleManager = this.messageStoreConfig.isEnableRocksDBStore() ?
this.liteLifecycleManager = this.messageStoreConfig.isEnableRocksDBStore() || this.messageStoreConfig.isRocksdbCQDoubleWriteEnable() ?
new RocksDBLiteLifecycleManager(this, this.liteSharding) : new LiteLifecycleManager(this, this.liteSharding);
this.liteSubscriptionRegistry = new LiteSubscriptionRegistryImpl(this, liteLifecycleManager);
this.liteSubscriptionCtlProcessor = new LiteSubscriptionCtlProcessor(this, liteSubscriptionRegistry);
Expand Down Expand Up @@ -951,6 +951,8 @@ public boolean recoverAndInitService() throws CloneNotSupportedException {
//scheduleMessageService load after messageStore load success
result = result && this.scheduleMessageService.load();

result = result && initLiteService();

for (BrokerAttachedPlugin brokerAttachedPlugin : brokerAttachedPlugins) {
if (brokerAttachedPlugin != null) {
result = result && brokerAttachedPlugin.load();
Expand All @@ -975,8 +977,6 @@ public boolean recoverAndInitService() throws CloneNotSupportedException {

initialRequestPipeline();

initLiteService();

if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
// Register a listener to reload SslContext
try {
Expand Down Expand Up @@ -1153,9 +1153,9 @@ private void initialRequestPipeline() {
}
}

private void initLiteService() {
private boolean initLiteService() {
this.liteEventDispatcher.init();
this.liteLifecycleManager.init();
return this.liteLifecycleManager.init();
}

public void registerProcessor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ public AbstractLiteLifecycleManager(BrokerController brokerController, LiteShard
this.liteSharding = liteSharding;
}

public void init() {
public boolean init() {
this.messageStore = brokerController.getMessageStore();
assert messageStore != null;
return true;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.RocksDBMessageStore;
import org.apache.rocketmq.store.queue.CombineConsumeQueueStore;
import org.apache.rocketmq.store.queue.RocksDBConsumeQueueOffsetTable;
import org.apache.rocketmq.store.queue.RocksDBConsumeQueueStore;
import org.apache.rocketmq.tieredstore.TieredMessageStore;
Expand Down Expand Up @@ -89,17 +90,30 @@ public List<Pair<String, String>> collectExpiredLiteTopic() {
}

@Override
public void init() {
public boolean init() {
super.init();
if (messageStore instanceof TieredMessageStore) { // only support TieredMessageStore plugin
messageStore = ((TieredMessageStore) messageStore).getDefaultStore();
}
if (!(messageStore instanceof RocksDBMessageStore)) {
LOGGER.warn("init failed, not a RocksDB store. {}", messageStore.getClass());
return; // startup with lite feature disabled

RocksDBConsumeQueueStore queueStore; // underlay rocksdb consume queue store
if (messageStore instanceof RocksDBMessageStore) { // storeType = defaultRocksDB
queueStore = (RocksDBConsumeQueueStore) messageStore.getQueueStore();
} else { // storeType = default && double write enable
if (!(messageStore.getQueueStore() instanceof CombineConsumeQueueStore)) {
LOGGER.warn("unexpected, not a CombineConsumeQueueStore. {}", messageStore.getQueueStore().getClass());
return false; // abort startup
}
CombineConsumeQueueStore combineConsumeQueueStore = (CombineConsumeQueueStore) messageStore.getQueueStore();
queueStore = combineConsumeQueueStore.getRocksDBConsumeQueueStore();
if (!messageStore.getMessageStoreConfig().isCombineCQUseRocksdbForLmq() || null == queueStore) {
LOGGER.warn("unexpected, rocksdbCQ is not ready for LMQ.");
return false; // abort startup
}
LOGGER.info("LiteLifecycleManager init with CombineConsumeQueueStore.");
}

try {
RocksDBConsumeQueueStore queueStore = (RocksDBConsumeQueueStore) messageStore.getQueueStore();
RocksDBConsumeQueueOffsetTable cqOffsetTable = (RocksDBConsumeQueueOffsetTable) FieldUtils.readField(
FieldUtils.getField(RocksDBConsumeQueueStore.class, "rocksDBConsumeQueueOffsetTable", true), queueStore);
@SuppressWarnings("unchecked")
Expand All @@ -108,6 +122,8 @@ public void init() {
maxCqOffsetTable = Collections.unmodifiableMap(innerMaxCqOffsetTable);
} catch (Exception e) {
LOGGER.error("LiteLifecycleManager-init error", e);
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.apache.rocketmq.remoting.protocol.header.GetParentTopicInfoRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.TriggerLiteDispatchRequestHeader;
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.store.queue.CombineConsumeQueueStore;
import org.apache.rocketmq.store.queue.ConsumeQueueStoreInterface;

import java.util.HashSet;
import java.util.Map;
Expand Down Expand Up @@ -106,12 +108,18 @@ protected RemotingCommand getBrokerLiteInfo(ChannelHandlerContext ctx,
body.setCurrentLmqNum(brokerController.getMessageStore().getQueueStore().getLmqNum());
body.setLiteSubscriptionCount(brokerController.getLiteSubscriptionRegistry().getActiveSubscriptionNum());
body.setOrderInfoCount(brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().getOrderInfoCount());
body.setCqTableSize(brokerController.getMessageStore().getQueueStore().getConsumeQueueTable().size());
body.setOffsetTableSize(brokerController.getConsumerOffsetManager().getOffsetTable().size());
body.setEventMapSize(brokerController.getLiteEventDispatcher().getEventMapSize());
body.setTopicMeta(LiteMetadataUtil.getTopicTtlMap(brokerController));
body.setGroupMeta(LiteMetadataUtil.getSubscriberGroupMap(brokerController));

ConsumeQueueStoreInterface consumeQueueStore = brokerController.getMessageStore().getQueueStore();
if (consumeQueueStore instanceof CombineConsumeQueueStore
&& brokerController.getMessageStoreConfig().isCombineCQUseRocksdbForLmq()) {
consumeQueueStore = ((CombineConsumeQueueStore) consumeQueueStore).getRocksDBConsumeQueueStore(); // not null
}
body.setCqTableSize(consumeQueueStore.getConsumeQueueTable().size());

response.setBody(body.encode());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@

public class LiteTestUtil {

public static MessageStore buildMessageStore(final BrokerConfig brokerConfig,
MessageStoreConfig storeConfig, final ConcurrentMap<String, TopicConfig> topicConfigTable,
boolean isRocksDBStore) throws Exception {

BrokerStatsManager brokerStatsManager = new BrokerStatsManager(brokerConfig);
MessageStore messageStore;
if (isRocksDBStore) {
messageStore = new RocksDBMessageStore(storeConfig, brokerStatsManager, null, brokerConfig, topicConfigTable);
} else {
messageStore = new DefaultMessageStore(storeConfig, brokerStatsManager, null, brokerConfig, topicConfigTable);
}
return messageStore;
}

public static MessageStore buildMessageStore(String storePathRootDir, final BrokerConfig brokerConfig,
final ConcurrentMap<String, TopicConfig> topicConfigTable, boolean isRocksDBStore) throws Exception {
MessageStoreConfig storeConfig = new MessageStoreConfig();
Expand All @@ -51,14 +65,7 @@ public static MessageStore buildMessageStore(String storePathRootDir, final Brok
storeConfig.setEnableMultiDispatch(true);
storeConfig.setStorePathRootDir(storePathRootDir);

BrokerStatsManager brokerStatsManager = new BrokerStatsManager(brokerConfig);
MessageStore messageStore;
if (isRocksDBStore) {
messageStore = new RocksDBMessageStore(storeConfig, brokerStatsManager, null, brokerConfig, topicConfigTable);
} else {
messageStore = new DefaultMessageStore(storeConfig, brokerStatsManager, null, brokerConfig, topicConfigTable);
}
return messageStore;
return buildMessageStore(brokerConfig, storeConfig, topicConfigTable, isRocksDBStore);
}

public static MessageExtBrokerInner buildMessage(String parentTopic, String liteTopic) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.lite.LiteUtil;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
import org.apache.rocketmq.store.plugin.MessageStorePluginContext;
import org.apache.rocketmq.store.queue.AbstractConsumeQueueStore;
import org.apache.rocketmq.tieredstore.TieredMessageStore;
import org.junit.AfterClass;
import org.junit.Assert;
Expand Down Expand Up @@ -125,9 +127,11 @@ public void testInit_otherStore() {

when(brokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG);
when(brokerController.getMessageStore()).thenReturn(pluginMessageStore);
when(pluginMessageStore.getQueueStore()).thenReturn(Mockito.mock(AbstractConsumeQueueStore.class));

RocksDBLiteLifecycleManager manager = new RocksDBLiteLifecycleManager(brokerController, liteSharding);
manager.init();

Assert.assertFalse(manager.init());
Assert.assertThrows(NullPointerException.class, () -> manager.getMaxOffsetInQueue("HW"));
}

Expand Down Expand Up @@ -239,4 +243,33 @@ public void testCleanByParentTopic() throws Exception {
Assert.assertEquals(0, liteLifecycleManager.getMaxOffsetInQueue(lmqName));
}
}

@Test
public void testInit_combineConsumeQueueStore() throws Exception {
MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setStorePathRootDir(
System.getProperty("java.io.tmpdir") + File.separator + "store-rocksDBLifecycleTest-" + UUID.randomUUID());
storeConfig.setRocksdbCQDoubleWriteEnable(true);
MessageStore messageStore = LiteTestUtil.buildMessageStore(BROKER_CONFIG, storeConfig, TOPIC_CONFIG_TABLE, false);
BrokerController brokerController = Mockito.mock(BrokerController.class);
LiteSharding liteSharding = Mockito.mock(LiteSharding.class);
when(brokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG);
when(brokerController.getMessageStore()).thenReturn(messageStore);

// enable
storeConfig.setCombineCQUseRocksdbForLmq(true);
RocksDBLiteLifecycleManager manager = new RocksDBLiteLifecycleManager(brokerController, liteSharding);
Assert.assertTrue(manager.init());
Assert.assertEquals(0, manager.getMaxOffsetInQueue(UUID.randomUUID().toString()));

// disable
storeConfig.setCombineCQUseRocksdbForLmq(false);
RocksDBLiteLifecycleManager manager2 = new RocksDBLiteLifecycleManager(brokerController, liteSharding);
Assert.assertFalse(manager2.init());
Assert.assertThrows(NullPointerException.class, () -> manager2.getMaxOffsetInQueue("HW"));

messageStore.shutdown();
messageStore.destroy();
UtilAll.deleteFile(new File(storeConfig.getStorePathRootDir()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,7 @@ public class MessageStoreConfig {
private String combineAssignOffsetCQType = StoreType.DEFAULT.getStoreType();
private boolean combineCQEnableCheckSelf = false;
private int combineCQMaxExtraSearchCommitLogFiles = 3;
private boolean combineCQUseRocksdbForLmq = false;

/**
* If ConsumeQueueStore is RocksDB based, this option is to configure bottom-most tier compression type.
Expand Down Expand Up @@ -520,6 +521,8 @@ public class MessageStoreConfig {
// Shared byte buffer manager configuration
private int sharedByteBufferNum = 16;

private boolean useSeparateStorePathForRocksdbCQ = false;

public String getRocksdbCompressionType() {
return rocksdbCompressionType;
}
Expand Down Expand Up @@ -2110,6 +2113,14 @@ public void setCombineCQMaxExtraSearchCommitLogFiles(int combineCQMaxExtraSearch
this.combineCQMaxExtraSearchCommitLogFiles = combineCQMaxExtraSearchCommitLogFiles;
}

public boolean isCombineCQUseRocksdbForLmq() {
return combineCQUseRocksdbForLmq;
}

public void setCombineCQUseRocksdbForLmq(boolean combineCQUseRocksdbForLmq) {
this.combineCQUseRocksdbForLmq = combineCQUseRocksdbForLmq;
}

public boolean isEnableLogConsumeQueueRepeatedlyBuildWhenRecover() {
return enableLogConsumeQueueRepeatedlyBuildWhenRecover;
}
Expand Down Expand Up @@ -2302,4 +2313,12 @@ public boolean isAppendTopicForTimerDeleteKey() {
public void setAppendTopicForTimerDeleteKey(boolean appendTopicForTimerDeleteKey) {
this.appendTopicForTimerDeleteKey = appendTopicForTimerDeleteKey;
}

public boolean isUseSeparateStorePathForRocksdbCQ() {
return useSeparateStorePathForRocksdbCQ;
}

public void setUseSeparateStorePathForRocksdbCQ(boolean useSeparateStorePathForRocksdbCQ) {
this.useSeparateStorePathForRocksdbCQ = useSeparateStorePathForRocksdbCQ;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public static String getStorePathBatchConsumeQueue(final String rootDir) {
return rootDir + File.separator + "batchconsumequeue";
}

public static String getStorePathRocksDBConsumeQueue(final String rootDir) {
return rootDir + File.separator + "consumequeue_r";
}

public static String getStorePathIndex(final String rootDir) {
return rootDir + File.separator + "index";
}
Expand Down
Loading
Loading