Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@

public class MQClientInstance {
private final static long LOCK_TIMEOUT_MILLIS = 3000;
private final static long RESET_OFFSET_MAX_WAIT = 10;
private final static Logger log = LoggerFactory.getLogger(MQClientInstance.class);
private final ClientConfig clientConfig;
private final String clientId;
Expand Down Expand Up @@ -1380,9 +1381,11 @@ public synchronized void resetOffset(String topic, String group, Map<MessageQueu
}
}

try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException ignored) {
if (!consumer.isConsumeOrderly()) {
try {
TimeUnit.SECONDS.sleep(RESET_OFFSET_MAX_WAIT);
} catch (InterruptedException ignored) {
}
}

Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
Expand All @@ -1391,8 +1394,10 @@ public synchronized void resetOffset(String topic, String group, Map<MessageQueu
Long offset = offsetTable.get(mq);
if (topic.equals(mq.getTopic()) && offset != null) {
try {
ProcessQueue pq = processQueueTable.get(mq);
waitResetOffsetReady(consumer, pq);
consumer.updateConsumeOffset(mq, offset);
consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, pq);
iterator.remove();
} catch (Exception e) {
log.warn("reset offset failed. group={}, {}", group, mq, e);
Expand All @@ -1406,6 +1411,22 @@ public synchronized void resetOffset(String topic, String group, Map<MessageQueu
}
}

private void waitResetOffsetReady(DefaultMQPushConsumerImpl consumer, ProcessQueue pq) {
if (consumer.isConsumeOrderly()) {
Lock lock = pq.getConsumeLock().writeLock();
boolean locked = false;
try {
locked = lock.tryLock(RESET_OFFSET_MAX_WAIT, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
} finally {
if (locked) {
lock.unlock();
}
}
}
}

@SuppressWarnings("unchecked")
public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {
MQConsumerInner impl = this.consumerTable.get(group);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -397,6 +402,119 @@ public void testResetOffset() throws IllegalAccessException {
eq(0L));
}

@Test
public void testResetOffsetOrderly() {
topicRouteTable.put(topic, createTopicRouteData());
brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
MessageQueue messageQueue = createMessageQueue();
ProcessQueue processQueue = new ProcessQueue();
RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
when(rebalanceImpl.removeUnnecessaryMessageQueue(eq(messageQueue), eq(processQueue)))
.thenReturn(false, false, true);
consumerTable.put(group, createMQConsumerInner(processQueue, true, rebalanceImpl));
Map<MessageQueue, Long> offsetTable = new HashMap<>();
offsetTable.put(messageQueue, 0L);

mqClientInstance.resetOffset(topic, group, offsetTable);

verify(rebalanceImpl).removeUnnecessaryMessageQueue(messageQueue, processQueue);
}

@Test
public void testResetOffsetOrderlyWhenWaitTimesOut() throws InterruptedException {
topicRouteTable.put(topic, createTopicRouteData());
brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
MessageQueue messageQueue = createMessageQueue();
ProcessQueue processQueue = mock(ProcessQueue.class);
ReadWriteLock consumeLock = mock(ReadWriteLock.class);
Lock writeLock = mock(Lock.class);
RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
when(processQueue.getConsumeLock()).thenReturn(consumeLock);
when(consumeLock.writeLock()).thenReturn(writeLock);
when(writeLock.tryLock(10, TimeUnit.SECONDS)).thenReturn(false);
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) createMQConsumerInner(processQueue, true, rebalanceImpl);
consumerTable.put(group, consumer);
Map<MessageQueue, Long> offsetTable = new HashMap<>();
offsetTable.put(messageQueue, 0L);

mqClientInstance.resetOffset(topic, group, offsetTable);

verify(consumer).updateConsumeOffset(messageQueue, 0L);
verify(rebalanceImpl).removeUnnecessaryMessageQueue(messageQueue, processQueue);
verify(writeLock, times(1)).tryLock(10, TimeUnit.SECONDS);
verify(writeLock, times(0)).unlock();
}

@Test
public void testResetOffsetOrderlyWaitsForInflightConsumptionBeforeUpdatingOffset() throws Exception {
topicRouteTable.put(topic, createTopicRouteData());
brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
MessageQueue messageQueue = createMessageQueue();
ProcessQueue processQueue = new ProcessQueue();
RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
when(rebalanceImpl.removeUnnecessaryMessageQueue(eq(messageQueue), eq(processQueue))).thenReturn(true);
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) createMQConsumerInner(processQueue, true, rebalanceImpl);
consumerTable.put(group, consumer);
Map<MessageQueue, Long> offsetTable = new HashMap<>();
offsetTable.put(messageQueue, 0L);

CountDownLatch consumeLockHeld = new CountDownLatch(1);
CountDownLatch releaseConsumeLock = new CountDownLatch(1);
CountDownLatch suspendCalled = new CountDownLatch(1);
CountDownLatch updateOffsetCalled = new CountDownLatch(1);
AtomicReference<Throwable> backgroundFailure = new AtomicReference<>();

doAnswer(invocation -> {
suspendCalled.countDown();
return null;
}).when(consumer).suspend();
doAnswer(invocation -> {
updateOffsetCalled.countDown();
return null;
}).when(consumer).updateConsumeOffset(messageQueue, 0L);

Thread consumingThread = new Thread(() -> {
processQueue.getConsumeLock().readLock().lock();
try {
consumeLockHeld.countDown();
if (!releaseConsumeLock.await(5, TimeUnit.SECONDS)) {
backgroundFailure.compareAndSet(null,
new AssertionError("Timed out while waiting to release orderly consume lock"));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
backgroundFailure.compareAndSet(null, e);
} finally {
processQueue.getConsumeLock().readLock().unlock();
}
});
Thread resetThread = new Thread(() -> {
try {
mqClientInstance.resetOffset(topic, group, offsetTable);
} catch (Throwable t) {
backgroundFailure.compareAndSet(null, t);
}
});

consumingThread.start();
assertTrue(consumeLockHeld.await(5, TimeUnit.SECONDS));

resetThread.start();
assertTrue(suspendCalled.await(5, TimeUnit.SECONDS));
assertFalse(updateOffsetCalled.await(200, TimeUnit.MILLISECONDS));

releaseConsumeLock.countDown();
consumingThread.join(5000);
resetThread.join(5000);

assertNull(backgroundFailure.get());
assertFalse(consumingThread.isAlive());
assertFalse(resetThread.isAlive());
assertTrue(updateOffsetCalled.await(1, TimeUnit.SECONDS));
verify(consumer).updateConsumeOffset(messageQueue, 0L);
verify(rebalanceImpl).removeUnnecessaryMessageQueue(messageQueue, processQueue);
}

@Test
public void testGetConsumerStatus() {
topicRouteTable.put(topic, createTopicRouteData());
Expand Down Expand Up @@ -475,17 +593,26 @@ private HashMap<Long, String> createBrokerAddrMap() {
}

private MQConsumerInner createMQConsumerInner() {
RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
when(rebalanceImpl.removeUnnecessaryMessageQueue(any(MessageQueue.class), any(ProcessQueue.class))).thenReturn(true);
return createMQConsumerInner(new ProcessQueue(), false, rebalanceImpl);
}

private MQConsumerInner createMQConsumerInner(ProcessQueue processQueue, boolean orderly, RebalanceImpl rebalanceImpl) {
ConcurrentMap<MessageQueue, ProcessQueue> processQueueMap = new ConcurrentHashMap<>();
processQueueMap.put(createMessageQueue(), processQueue);
return createMQConsumerInner(processQueueMap, orderly, rebalanceImpl);
}

private MQConsumerInner createMQConsumerInner(ConcurrentMap<MessageQueue, ProcessQueue> processQueueMap, boolean orderly, RebalanceImpl rebalanceImpl) {
DefaultMQPushConsumerImpl result = mock(DefaultMQPushConsumerImpl.class);
Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
SubscriptionData subscriptionData = mock(SubscriptionData.class);
subscriptionDataSet.add(subscriptionData);
when(result.subscriptions()).thenReturn(subscriptionDataSet);
RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
ConcurrentMap<MessageQueue, ProcessQueue> processQueueMap = new ConcurrentHashMap<>();
ProcessQueue processQueue = new ProcessQueue();
processQueueMap.put(createMessageQueue(), processQueue);
when(rebalanceImpl.getProcessQueueTable()).thenReturn(processQueueMap);
when(result.getRebalanceImpl()).thenReturn(rebalanceImpl);
when(result.isConsumeOrderly()).thenReturn(orderly);
OffsetStore offsetStore = mock(OffsetStore.class);
when(result.getOffsetStore()).thenReturn(offsetStore);
ConsumeMessageService consumeMessageService = mock(ConsumeMessageService.class);
Expand Down
Loading