From 6a8478add2c67549f644569ff1244fad3c4e7f84 Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Tue, 10 Feb 2026 18:04:01 -0600 Subject: [PATCH] [AMQ-9858] Support purging the first number of messages from a queue (cherry picked from commit 428eeb5560fa2ef12a52c35df16c59ea291038dc) --- .../apache/activemq/broker/jmx/QueueView.java | 8 +++++++ .../activemq/broker/jmx/QueueViewMBean.java | 8 +++++++ .../apache/activemq/broker/region/Queue.java | 22 +++++++++++++++---- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java index 64a4c2792c2..6f7092d5a19 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java @@ -64,6 +64,14 @@ public synchronized void purge() throws Exception { LOG.info("{} purge of {} messages", destination.getActiveMQDestination().getQualifiedName(), originalMessageCount); } + public synchronized void purge(long numberOfMessages) throws Exception { + final long originalMessageCount = destination.getDestinationStatistics().getMessages().getCount(); + + ((Queue)destination).purge(numberOfMessages); + + LOG.info("{} purge of {} of {} messages", destination.getActiveMQDestination().getQualifiedName(), numberOfMessages, originalMessageCount); + } + public synchronized boolean removeMessage(String messageId) throws Exception { return ((Queue)destination).removeMessage(messageId); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java index 27ef61c9f0f..fea5edf10ff 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java @@ -70,6 +70,14 @@ public interface QueueViewMBean extends DestinationViewMBean { @MBeanInfo("Removes all of the messages in the queue.") void purge() throws Exception; + /** + * Removes the first number of messages in the queue. + * + * @throws Exception + */ + @MBeanInfo("Removes the first number of messages in the queue.") + void purge(long number) throws Exception; + /** * Copies a given message to another destination. * diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index c87b37c1b65..963dd14d732 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1313,9 +1313,15 @@ public QueueMessageReference getMessage(String id) { } public void purge() throws Exception { + purge(this.destinationStatistics.getMessages().getCount()); + } + + public void purge(long numberOfMessages) throws Exception { ConnectionContext c = createConnectionContext(); List list = null; sendLock.lock(); + + long purgeCount = 0L; try { long originalMessageCount = this.destinationStatistics.getMessages().getCount(); do { @@ -1327,20 +1333,28 @@ public void purge() throws Exception { pagedInMessagesLock.readLock().unlock(); } - for (MessageReference ref : list) { + int deleteCount = list.size(); + if ((numberOfMessages - purgeCount) < list.size()) { + deleteCount = (int)(numberOfMessages - purgeCount); + } + + for (int n=0; n < deleteCount; n++) { try { - QueueMessageReference r = (QueueMessageReference) ref; + QueueMessageReference r = (QueueMessageReference) list.get(n); removeMessage(c, r); messages.rollback(r.getMessageId()); + purgeCount++; } catch (IOException e) { } } // don't spin/hang if stats are out and there is nothing left in the // store - } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0); + } while (!list.isEmpty() && + this.destinationStatistics.getMessages().getCount() > 0 && + purgeCount < numberOfMessages); if (this.destinationStatistics.getMessages().getCount() > 0) { - LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount()); + LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), numberOfMessages, this.destinationStatistics.getMessages().getCount()); } } finally { sendLock.unlock();