From be90ac4fb9b47e942c3b15ee5c73485dc319d5dc Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Thu, 26 Mar 2026 19:24:39 -0400 Subject: [PATCH 1/2] ARTEMIS-5573 and ARTEMIS-5975 Improve AMQP Size estimation and make it static --- .../amqp/broker/AMQPLargeMessage.java | 17 +- .../protocol/amqp/broker/AMQPMessage.java | 66 +++---- .../amqp/broker/AMQPStandardMessage.java | 86 +------- .../protocol/amqp/broker/AMQPMessageTest.java | 19 +- .../amqp/AmqpClientTestSupport.java | 9 +- .../integration/amqp/AmqpEstimateTest.java | 148 ++++++++++++++ .../amqp/connect/AckManagerTest.java | 3 +- .../amqp/journal/AmqpJournalLoadingTest.java | 25 ++- .../integration/routing/ElasticQueueTest.java | 2 +- .../soak/memoryFlood/AMQPGlobalMaxTest.java | 184 ++++++++++++++++++ 10 files changed, 410 insertions(+), 149 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpEstimateTest.java create mode 100644 tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/memoryFlood/AMQPGlobalMaxTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index 99dbe2d8adb..f74779185a8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -80,6 +80,8 @@ public Message getMessage() { private boolean reencoded = false; + private int applicationPropertiesSize; + /** * AMQPLargeMessagePersister will save the buffer here. */ @@ -264,7 +266,9 @@ protected void readSavedEncoding(ByteBuf buf) { applicationPropertiesPosition = buf.readInt(); remainingBodyPosition = buf.readInt(); + int applicationPropertiesInitialPosition = buf.readerIndex(); applicationProperties = (ApplicationProperties)TLSEncode.getDecoder().readObject(); + this.applicationPropertiesSize = buf.readerIndex() - applicationPropertiesInitialPosition; if (properties != null && properties.getAbsoluteExpiryTime() != null && properties.getAbsoluteExpiryTime().getTime() > 0) { if (!expirationReload) { @@ -412,6 +416,16 @@ private void genericParseLargeMessage() { } } + @Override + protected ApplicationProperties readApplicationProperties(ReadableBuffer data, int position) { + applicationProperties = super.readApplicationProperties(data, position); + if (applicationProperties != null) { + this.applicationPropertiesSize = data.position() - position; + } + return applicationProperties; + } + + protected void parseLargeMessage(ReadableBuffer data) { MessageDataScanningStatus status = getDataScanningStatus(); if (status == MessageDataScanningStatus.NOT_SCANNED) { @@ -604,8 +618,7 @@ public long getWholeMessageSize() { @Override public synchronized int getMemoryEstimate() { if (memoryEstimate == -1) { - memoryEstimate = memoryOffset * 2 + (extraProperties != null ? extraProperties.getEncodeSize() : 0); - originalEstimate = memoryEstimate; + memoryEstimate = AMQP_OFFSET + (extraProperties != null ? extraProperties.getEncodeSize() : 0) + applicationPropertiesSize * 4; } return memoryEstimate; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index e078fdb2363..ce489098e89 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -44,7 +44,6 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.openmbean.CompositeDataConstants; import org.apache.activemq.artemis.core.message.openmbean.MessageOpenTypeFactory; -import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools; import org.apache.activemq.artemis.core.persistence.Persister; import org.apache.activemq.artemis.core.server.MessageReference; @@ -119,6 +118,10 @@ */ public abstract class AMQPMessage extends RefCountMessage implements org.apache.activemq.artemis.api.core.Message { + // how much an AMQP Message takes more in the memory, beyond the Message.offset. + // this is an estimate, and it's based on testing + public static final int AMQP_OFFSET = 1300; + private static final SimpleString ANNOTATION_AREA_PREFIX = SimpleString.of("m."); protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -146,7 +149,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. * developing purposes. */ public enum MessageDataScanningStatus { - NOT_SCANNED(0), RELOAD_PERSISTENCE(1), SCANNED(2); + NOT_SCANNED(0), SCANNED(1); private static final MessageDataScanningStatus[] STATES; @@ -205,7 +208,6 @@ private static void checkCode(int code) { protected long messageID; protected SimpleString address; protected volatile int memoryEstimate = -1; - protected volatile int originalEstimate = -1; protected long expiration; protected boolean expirationReload = false; protected long scheduledTime = -1; @@ -546,36 +548,27 @@ protected ApplicationProperties lazyDecodeApplicationProperties() { // need to synchronize access to lazyDecodeApplicationProperties to avoid clashes with getMemoryEstimate protected synchronized ApplicationProperties lazyDecodeApplicationProperties(ReadableBuffer data) { if (applicationProperties == null && applicationPropertiesPosition != VALUE_NOT_PRESENT) { - applicationProperties = scanForMessageSection(data, applicationPropertiesPosition, ApplicationProperties.class); - if (owner != null && memoryEstimate != -1) { - // the memory has already been tracked and needs to be updated to reflect the new decoding - int addition = unmarshalledApplicationPropertiesMemoryEstimateFromData(data); - - // it is difficult to track the updates for paged messages - // for that reason we won't do it if paged - // we also only do the update if the message was previously routed - // so if a debug method or an interceptor changed the size before routing we would get a different size - if (!isPaged && routed) { - ((PagingStore) owner).addSize(addition, false); - final int updatedEstimate = memoryEstimate + addition; - memoryEstimate = updatedEstimate; - } - } + readApplicationProperties(data, applicationPropertiesPosition); } return applicationProperties; } + protected ApplicationProperties readApplicationProperties(ReadableBuffer data, int position) { + applicationProperties = scanForMessageSection(data, position, ApplicationProperties.class); + return applicationProperties; + } + protected int unmarshalledApplicationPropertiesMemoryEstimateFromData(ReadableBuffer data) { - if (applicationProperties != null) { - // they have been unmarshalled, estimate memory usage based on their encoded size - if (remainingBodyPosition != VALUE_NOT_PRESENT) { - return remainingBodyPosition - applicationPropertiesPosition; - } else { - return data.capacity() - applicationPropertiesPosition; - } + // no need to rescan if it's from RELOAD_PERSISTENCE + ensureScanning(); + + // they have been unmarshalled, estimate memory usage based on their encoded size + if (remainingBodyPosition != VALUE_NOT_PRESENT) { + return remainingBodyPosition - applicationPropertiesPosition; + } else { + return data.capacity() - applicationPropertiesPosition; } - return 0; } @SuppressWarnings("unchecked") @@ -661,9 +654,6 @@ protected synchronized void ensureMessageDataScanned() { case NOT_SCANNED: scanMessageData(); break; - case RELOAD_PERSISTENCE: - lazyScanAfterReloadPersistence(); - break; case SCANNED: // NO-OP break; @@ -686,7 +676,6 @@ protected synchronized void resetMessageData() { priority = DEFAULT_MESSAGE_PRIORITY; encodedHeaderSize = 0; memoryEstimate = -1; - originalEstimate = -1; scheduledTime = -1; encodedDeliveryAnnotationsSize = 0; headerPosition = VALUE_NOT_PRESENT; @@ -885,12 +874,8 @@ public final void receiveBuffer(ByteBuf buffer) { @Override public int getOriginalEstimate() { - if (originalEstimate < 0) { - // getMemoryEstimate should initialize originalEstimate - return getMemoryEstimate(); - } else { - return originalEstimate; - } + // getMemoryEstimate should initialize originalEstimate + return getMemoryEstimate(); } @Override @@ -1033,13 +1018,9 @@ protected int internalPersistSize() { public abstract void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pools); protected synchronized void lazyScanAfterReloadPersistence() { - assert messageDataScanned == MessageDataScanningStatus.RELOAD_PERSISTENCE.code; scanMessageData(); messageDataScanned = MessageDataScanningStatus.SCANNED.code; modified = false; - // reinitialise memory estimate as message will already be on a queue - // and lazy decode will want to update - getMemoryEstimate(); } @Override @@ -1223,9 +1204,8 @@ public boolean isDurable() { if (header != null && header .getDurable() != null) { return header.getDurable(); } else { - // if header == null and scanningStatus=RELOAD_PERSISTENCE, it means the message can only be durable - // even though the parsing hasn't happened yet - return getDataScanningStatus() == MessageDataScanningStatus.RELOAD_PERSISTENCE; + // we will assume it's non persistent if no header + return false; } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java index c514009bcee..19cc125bfb9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java @@ -39,12 +39,8 @@ import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.codec.DecodeException; -import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.EncoderImpl; -import org.apache.qpid.proton.codec.EncodingCodes; import org.apache.qpid.proton.codec.ReadableBuffer; -import org.apache.qpid.proton.codec.TypeConstructor; import org.apache.qpid.proton.codec.WritableBuffer; // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format @@ -192,14 +188,7 @@ protected ReadableBuffer getData() { @Override public synchronized int getMemoryEstimate() { if (memoryEstimate == -1) { - if (isPaged) { - // When the message is paged, we don't take the unmarshalled application properties because it could be - // updated at different places. We just keep the estimate simple when paging. - memoryEstimate = memoryOffset + (data != null ? data.capacity() : 0); - } else { - memoryEstimate = memoryOffset + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) : 0); - } - originalEstimate = memoryEstimate; + memoryEstimate = AMQP_OFFSET + (data != null ? data.capacity() + unmarshalledApplicationPropertiesMemoryEstimateFromData(data) * 4 : 0); } return memoryEstimate; @@ -237,79 +226,10 @@ public void reloadPersistence(ActiveMQBuffer record, CoreMessageObjectPools pool // Message state is now that the underlying buffer is loaded, but the contents not yet scanned resetMessageData(); - recoverHeaderDataFromEncoding(); + scanMessageData(data); modified = false; - messageDataScanned = MessageDataScanningStatus.RELOAD_PERSISTENCE.code; - } - - private void recoverHeaderDataFromEncoding() { - final DecoderImpl decoder = TLSEncode.getDecoder(); - decoder.setBuffer(data); - - try { - // At one point the broker could write the header and delivery annotations out of order - // which means a full scan is required for maximum compatibility with that older data - // where delivery annotations could be found ahead of the Header in the encoding. - // - // We manually extract the priority from the Header encoding if present to ensure we do - // not create any unneeded GC overhead during load from storage. We don't directly store - // other values from the header except for a value that is computed based on TTL and or - // absolute expiration time in the Properties section, but that value is stored in the - // data of the persisted message. - for (int section = 0; section < 2 && data.hasRemaining(); section++) { - final TypeConstructor constructor = decoder.readConstructor(); - - if (Header.class.equals(constructor.getTypeClass())) { - final byte typeCode = data.get(); - - @SuppressWarnings("unused") - int size = 0; - int count = 0; - - switch (typeCode) { - case EncodingCodes.LIST0: - break; - case EncodingCodes.LIST8: - size = data.get() & 0xff; - count = data.get() & 0xff; - break; - case EncodingCodes.LIST32: - size = data.getInt(); - count = data.getInt(); - break; - default: - throw new DecodeException("Incorrect type found in Header encoding: " + typeCode); - } - - // Priority is stored in the second slot of the Header list encoding if present - if (count >= 2) { - decoder.readBoolean(false); // Discard durable for now, it is computed elsewhere. - - final byte encodingCode = data.get(); - final int priority = switch (encodingCode) { - case EncodingCodes.UBYTE -> data.get() & 0xff; - case EncodingCodes.NULL -> DEFAULT_MESSAGE_PRIORITY; - default -> - throw new DecodeException("Expected UnsignedByte type but found encoding: " + EncodingCodes.toString(encodingCode)); - }; - - // Scaled here so do not call setPriority as that will store the set value in the AMQP header - // and we don't want to create that Header instance at this stage. - this.priority = (byte) Math.min(priority, MAX_MESSAGE_PRIORITY); - } - - return; - } else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) { - constructor.skipValue(); - } else { - return; - } - } - } finally { - decoder.setBuffer(null); - data.rewind(); // Ensure next scan start at the beginning. - } + messageDataScanned = MessageDataScanningStatus.SCANNED.code; } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java index 51228637f4d..902be6f6649 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessageTest.java @@ -216,11 +216,11 @@ public void testHasScheduledDeliveryTimeReloadPersistence() { // Now reload from encoded data message.reloadPersistence(encoded, null); - assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); + assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus()); assertTrue(message.hasScheduledDeliveryTime()); - assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); + assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus()); message.getHeader(); @@ -249,11 +249,11 @@ public void testHasScheduledDeliveryDelayReloadPersistence() { // Now reload from encoded data message.reloadPersistence(encoded, null); - assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); + assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus()); assertTrue(message.hasScheduledDeliveryTime()); - assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); + assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus()); message.getHeader(); @@ -279,11 +279,11 @@ public void testNoScheduledDeliveryTimeOrDelayReloadPersistence() { // Now reload from encoded data message.reloadPersistence(encoded, null); - assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); + assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus()); assertFalse(message.hasScheduledDeliveryTime()); - assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); + assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus()); message.getHeader(); @@ -331,12 +331,7 @@ private void testGetMemoryEstimateWithDecodedApplicationProperties(boolean paged } assertEquals(TEST_APPLICATION_PROPERTY_VALUE, decodedWithApplicationPropertiesUnmarshalled.getStringProperty(TEST_APPLICATION_PROPERTY_KEY)); - - if (paged) { - assertEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate()); - } else { - assertNotEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate()); - } + assertEquals(decodedWithApplicationPropertiesUnmarshalled.getMemoryEstimate(), decoded.getMemoryEstimate()); } //----- Test Connection ID access -----------------------------------------// diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 3e5a0bced90..3c8ae51f009 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -303,10 +303,14 @@ protected void sendMessages(String destinationName, } protected void sendMessages(String destinationName, int count, boolean durable) throws Exception { - sendMessages(destinationName, count, durable, null); + sendMessages(destinationName, count, durable, null, null); } protected void sendMessages(String destinationName, int count, boolean durable, byte[] payload) throws Exception { + sendMessages(destinationName, count, durable, payload, null); + } + + protected void sendMessages(String destinationName, int count, boolean durable, byte[] payload, Map properties) throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); try { @@ -320,6 +324,9 @@ protected void sendMessages(String destinationName, int count, boolean durable, if (payload != null) { message.setBytes(payload); } + if (properties != null) { + properties.forEach((a, b) -> message.setApplicationProperty(a, b)); + } sender.send(message); } } finally { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpEstimateTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpEstimateTest.java new file mode 100644 index 00000000000..4eef62ca61b --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpEstimateTest.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import java.lang.invoke.MethodHandles; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.RandomUtil; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Test broker behavior when creating AMQP senders + */ +public class AmqpEstimateTest extends AmqpClientTestSupport { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + @Test + public void testCheckEstimates() throws Exception { + internalTest(1, 0, 0); // small messages + internalTest(1, 100, 10 * 1024); // small messages + internalTest(100, 100, 10 * 1024); // small messages + internalTest(100, 200 * 1024, 10 * 1024); // large messages + } + + public void internalTest(int messages, int bodySize, int propertySize) throws Exception { + + String queueName = getQueueName() + RandomUtil.randomUUIDString(); + + long sizeBeforeRestart; + + { // namespace for before restarting the server + org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(QueueConfiguration.of(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST)); + + assertEquals(0, serverQueue.getPagingStore().getAddressSize()); + + ConnectionFactory coreFactory = CFUtil.createConnectionFactory("CORE", "tcp://localhost:5672"); + + long coreEstimate; + + try (Connection connection = coreFactory.createConnection()) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + TextMessage message = session.createTextMessage("s".repeat(bodySize)); + if (propertySize > 0) { + message.setStringProperty("large", "a".repeat(propertySize)); + } + producer.send(message); + session.commit(); + Wait.assertTrue(() -> serverQueue.getPagingStore().getAddressSize() > 0); + coreEstimate = serverQueue.getPagingStore().getAddressSize(); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + connection.start(); + assertNotNull(consumer.receive(5000)); + session.commit(); + Wait.assertTrue(() -> serverQueue.getPagingStore().getAddressSize() == 0); + } + + ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:5672"); + + try (Connection connection = factory.createConnection()) { + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < messages; i++) { + TextMessage message = session.createTextMessage("s".repeat(bodySize)); + if (propertySize > 0) { + message.setStringProperty("large", "a".repeat(propertySize)); + } + producer.send(message); + if (i == 0) { + session.commit(); + // the size of 1 message in AMQP has to be at least the size of CORE + // this is to validate the estimate of the AMQP messages + Wait.assertTrue(() -> "Expected at least " + coreEstimate + " but got " + serverQueue.getPagingStore().getAddressSize() + " on addressSize", () -> serverQueue.getPagingStore().getAddressSize() > coreEstimate, 5000, 100); + + logger.info("CoreEstimate = {} while AMQPEstimate={}", coreEstimate, serverQueue.getPagingStore().getAddressSize()); + } + } + session.commit(); + + Wait.assertTrue(() -> serverQueue.getPagingStore().getAddressSize() > 0); + + // just the 'large' property itself is 10K, It should take that into consideration + if (propertySize > 0) { + assertTrue(serverQueue.getPagingStore().getAddressSize() > messages * (long) propertySize); + } + } + + long before = serverQueue.getPagingStore().getAddressSize(); + logger.info("Message Size {}", serverQueue.getPagingStore().getAddressSize()); + serverQueue.forEach(e -> e.getMessage().getPropertyNames()); + logger.info("Message Size after {}", serverQueue.getPagingStore().getAddressSize()); + long diff = serverQueue.getPagingStore().getAddressSize() - before; + logger.info("******************************************************************************** The difference is {}", diff); + assertEquals(0L, diff); + + sizeBeforeRestart = serverQueue.getPagingStore().getAddressSize(); + server.stop(); + } + + server.start(); + { // namespace for after restarting the server + org.apache.activemq.artemis.core.server.Queue serverQueue = server.locateQueue(queueName); + + long sizeAfterRestart = serverQueue.getPagingStore().getAddressSize(); + + assertEquals(sizeBeforeRestart, sizeAfterRestart); + + serverQueue.deleteQueue(); + } + } + +} \ No newline at end of file diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java index c237b0de439..8745970b1d2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AckManagerTest.java @@ -97,7 +97,8 @@ public void setUp() throws Exception { super.setUp(); server1 = createServer(true, createDefaultConfig(0, true), 100024, -1, -1, -1); - server1.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1).setMaxReadPageMessages(20)); + server1.getConfiguration().addAddressSetting(SNF_NAME, new AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1).setMaxReadPageMessages(20).setMaxReadPageMessages(-1)); + server1.getConfiguration().addAddressSetting("#", new AddressSettings().setMaxSizeBytes(-1).setMaxSizeMessages(-1).setMaxReadPageMessages(-1).setMaxReadPageMessages(-1)); server1.getConfiguration().getAcceptorConfigurations().clear(); server1.getConfiguration().addAcceptorConfiguration("server", "tcp://localhost:61616"); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java index 37e4dd2282d..b81a5196c95 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/journal/AmqpJournalLoadingTest.java @@ -21,8 +21,11 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; @@ -38,11 +41,19 @@ public class AmqpJournalLoadingTest extends AmqpClientTestSupport { @Test - public void durableMessageDataNotScannedOnRestartTest() throws Exception { - sendMessages(getQueueName(), 1, true); + public void durableMessageDataAfterRestart() throws Exception { + Map properties = new HashMap<>(); + properties.put("largeOne", "a".repeat(10 * 1024)); + sendMessages(getQueueName(), 1, true, null, properties); + //sendMessages(getQueueName(), 1, true, null, null); final Queue queueView = getProxyToQueue(getQueueName()); Wait.assertTrue("All messages should arrive", () -> queueView.getMessageCount() == 1); + AtomicInteger messageSize = new AtomicInteger(0); + queueView.forEach( r -> { + messageSize.addAndGet(r.getMessage().getMemoryEstimate()); + }); + server.stop(); server.start(); @@ -52,15 +63,17 @@ public void durableMessageDataNotScannedOnRestartTest() throws Exception { List messageReference = new ArrayList<>(1); + AtomicInteger messageSizeAfterRestart = new AtomicInteger(0); + afterRestartQueueView.forEach((next) -> { final AMQPMessage message = (AMQPMessage)next.getMessage(); - assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); - assertTrue(message.isDurable()); - // Doing the check again in case isDurable messed up the scanning status. It should not change the status by definition - assertEquals(AMQPMessage.MessageDataScanningStatus.RELOAD_PERSISTENCE, message.getDataScanningStatus()); + assertEquals(AMQPMessage.MessageDataScanningStatus.SCANNED, message.getDataScanningStatus()); messageReference.add(message); + messageSizeAfterRestart.addAndGet(next.getMessage().getMemoryEstimate()); }); + assertEquals(messageSize.get(), messageSizeAfterRestart.get()); + assertEquals(1, messageReference.size()); AmqpClient client = createAmqpClient(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/ElasticQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/ElasticQueueTest.java index 41cc2771a8d..f404307d5f5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/ElasticQueueTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/routing/ElasticQueueTest.java @@ -421,7 +421,7 @@ public boolean validateUserAndRole(final String username, private void prepareNodesAndStartCombinedHeadTail() throws Exception { AddressSettings blockingQueue = new AddressSettings(); blockingQueue - .setMaxSizeBytes(100 * 1024) + .setMaxSizeBytes(400 * 1024) .setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL) .setSlowConsumerPolicy(SlowConsumerPolicy.KILL).setSlowConsumerThreshold(0).setSlowConsumerCheckPeriod(1) .setAutoDeleteQueues(false).setAutoDeleteAddresses(false); // so slow consumer can kick in! diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/memoryFlood/AMQPGlobalMaxTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/memoryFlood/AMQPGlobalMaxTest.java new file mode 100644 index 00000000000..cbe3c2f3067 --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/memoryFlood/AMQPGlobalMaxTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak.memoryFlood; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.File; +import java.lang.invoke.MethodHandles; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.artemis.cli.commands.helper.HelperCreate; +import org.apache.activemq.artemis.tests.soak.SoakTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.FileUtil; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AMQPGlobalMaxTest extends SoakTestBase { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + public static final String QUEUE_NAME = "simpleTest"; + public static final String SERVER_NAME = "global-max-test"; + private static File serverLocation; + + private static final long MESSAGE_COUNT = 250_000; + + private Process server; + + public static void createServers(long globalMaxSize) throws Exception { + serverLocation = getFileServerLocation(SERVER_NAME); + deleteDirectory(serverLocation); + + HelperCreate cliCreateServer = helperCreate(); + cliCreateServer.setUseAIO(false).setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation); + // to speedup producers + cliCreateServer.addArgs("--no-fsync"); + cliCreateServer.addArgs("--queues", QUEUE_NAME); + cliCreateServer.addArgs("--global-max-size", String.valueOf(100 * 1024 * 1024)); + // limiting memory to make the test more predictable + cliCreateServer.addArgs("--java-memory", "256M"); + cliCreateServer.createServer(); + + FileUtil.findReplace(new File(serverLocation, "/etc/artemis.profile"), "-Xms512M", "-Xms10M -verbose:gc"); + } + + // This test is commented out for obvious reasons + // I used it to determine how many bytes each message without any body would consume + // this helps me determine + @Test + @Disabled + public void testSendUntilOME() throws Exception { + // making the broker to OME on purpose + // this is to help us calculate the size of each message + createServers(1024 * 1024 * 1024); + server = startServer(SERVER_NAME, 0, 5000); + + executeTest(false); + } + + + @Test + public void testGlobalMax() throws Exception { + createServers(100 * 1024 * 1024); + server = startServer(SERVER_NAME, 0, 5000); + + executeTest(true); + } + + private void executeTest(boolean hasBody) throws Exception { + ExecutorService service = Executors.newFixedThreadPool(1); + runAfter(service::shutdownNow); + + CountDownLatch latchDone = new CountDownLatch(1); + AtomicInteger errors = new AtomicInteger(0); + + service.execute(() -> { + try { + theTest(hasBody); + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + errors.incrementAndGet(); + } finally { + latchDone.countDown(); + } + }); + + long timeout = System.currentTimeMillis() + 120_000; + + File logLocation = new File(serverLocation, "log/artemis.log"); + do { + assertFalse(FileUtil.find(logLocation, l -> l.contains("java.lang.OutOfMemoryError")), "There was an OME"); + } while (!latchDone.await(1, TimeUnit.SECONDS) && timeout > System.currentTimeMillis()); + + assertTrue(latchDone.await(1, TimeUnit.SECONDS)); + } + + private void theTest(boolean hasBody) throws Exception { + + ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616"); + + logger.info("******************************************************************************** Sleeping"); + Thread.sleep(1000); + logger.info("******************************************************************************** Starting producers"); + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + + for (int i = 0; i < MESSAGE_COUNT; i++) { + + Message message; + if (hasBody) { + if (i % 100 == 0) { + // 10 large messages per transaction + message = session.createTextMessage("a".repeat(101 * 1024)); + message.setStringProperty("i", "a".repeat(10 * 1024)); + } else { + message = session.createMessage(); + } + message.setIntProperty("i", i); + } else { + message = session.createMessage(); + } + producer.send(message); + + if (i % 10_000 == 0) { + logger.info("sent {} out of {}", i, MESSAGE_COUNT); + session.commit(); + } + } + session.commit(); + } + + + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + connection.start(); + + for (int i = 0; i < MESSAGE_COUNT; i++) { + if (i % 1000 == 0) { + logger.info("Received {}", i); + } + Message message = consumer.receive(5000); + assertNotNull(message); + assertEquals(i, message.getIntProperty("i")); + } + assertNull(consumer.receiveNoWait()); + } + } + +} From 512644fe648989e67be63cdddc8ae2568eb13e2e Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Sun, 29 Mar 2026 17:56:11 -0400 Subject: [PATCH 2/2] NO-JIRA fixing intermittent failure on LVQTest --- .../tests/integration/server/LVQTest.java | 50 +++++++++++++++++-- 1 file changed, 46 insertions(+), 4 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java index 1888765b942..0976d6fc249 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/LVQTest.java @@ -92,6 +92,8 @@ public void testSimple() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); + assertEquals("m2", m.getBodyBuffer().readString()); } @@ -99,7 +101,7 @@ public void testSimple() throws Exception { public void testSimpleExclusive() throws Exception { ServerLocator locator = createNettyNonHALocator().setConsumerWindowSize(0); ClientSessionFactory sf = createSessionFactory(locator); - ClientSession clientSession = addClientSession(sf.createSession(false, true, true)); + ClientSession clientSession = addClientSession(sf.createSession(false, true, false)); final String EXCLUSIVE_QUEUE = "exclusiveQueue"; clientSession.createQueue(QueueConfiguration.of(EXCLUSIVE_QUEUE).setExclusive(true).setLastValue(true)); @@ -116,6 +118,7 @@ public void testSimpleExclusive() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m2", m.getBodyBuffer().readString()); } @@ -138,7 +141,7 @@ public void testSimpleRestart() throws Exception { assertEquals(1, server.locateQueue(qName1).getMessageCount()); ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0); ClientSessionFactory sf = createSessionFactory(locator); - clientSession = addClientSession(sf.createSession(false, true, true)); + clientSession = addClientSession(sf.createSession(false, true, false)); producer = clientSession.createProducer(address); ClientMessage m3 = createTextMessage(clientSession, "m3"); m3.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); @@ -151,6 +154,7 @@ public void testSimpleRestart() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m3", m.getBodyBuffer().readString()); } @@ -180,10 +184,12 @@ public void testMultipleMessages() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m3", m.getBodyBuffer().readString()); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m4", m.getBodyBuffer().readString()); assertNull(consumer.receiveImmediate()); } @@ -203,10 +209,12 @@ public void testMultipleMessagesWithoutLastValue() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("message1", m.getBodyBuffer().readString()); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("message2", m.getBodyBuffer().readString()); } @@ -237,6 +245,7 @@ public void testMultipleRollback() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m1", m.getBodyBuffer().readString()); assertNull(consumer.receiveImmediate()); clientSessionTxReceives.commit(); @@ -257,10 +266,12 @@ public void testFirstMessageReceivedButAckedAfter() throws Exception { assertNotNull(m); producer.send(m2); m.acknowledge(); + clientSession.commit(); assertEquals("m1", m.getBodyBuffer().readString()); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m2", m.getBodyBuffer().readString()); } @@ -285,6 +296,7 @@ public void testFirstMessageReceivedAndCancelled() throws Exception { assertNotNull(m); assertEquals("m2", m.getBodyBuffer().readString()); m.acknowledge(); + clientSession.commit(); m = consumer.receiveImmediate(); assertNull(m); } @@ -337,6 +349,7 @@ public void testManyMessagesReceivedAndCancelled() throws Exception { m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m6", m.getBodyBuffer().readString()); m = consumer.receiveImmediate(); assertNull(m); @@ -358,6 +371,7 @@ public void testSimpleInTx() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m2", m.getBodyBuffer().readString()); } @@ -383,10 +397,12 @@ public void testMultipleMessagesInTx() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m3", m.getBodyBuffer().readString()); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m4", m.getBodyBuffer().readString()); clientSessionTxReceives.commit(); m = consumer.receiveImmediate(); @@ -413,29 +429,35 @@ public void testMultipleMessagesInTxRollback() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m1", m.getBodyBuffer().readString()); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m2", m.getBodyBuffer().readString()); producer.send(m3); producer.send(m4); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m3", m.getBodyBuffer().readString()); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m4", m.getBodyBuffer().readString()); clientSessionTxReceives.rollback(); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m3", m.getBodyBuffer().readString()); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m4", m.getBodyBuffer().readString()); } @@ -455,6 +477,7 @@ public void testSingleTXRollback() throws Exception { m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSessionTxReceives.commit(); assertEquals("m1", m.getBodyBuffer().readString()); assertNull(consumer.receiveImmediate()); } @@ -488,6 +511,7 @@ public void testMultipleMessagesInTxSend() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSessionTxSends.commit(); assertEquals("m" + i, m.getBodyBuffer().readString()); } consumer.close(); @@ -495,6 +519,7 @@ public void testMultipleMessagesInTxSend() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSessionTxSends.commit(); assertEquals("m6", m.getBodyBuffer().readString()); } @@ -531,9 +556,11 @@ public void testMultipleMessagesPersistedCorrectly() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m6", m.getBodyBuffer().readString()); m = consumer.receiveImmediate(); assertNull(m); + clientSession.commit(); } @Test @@ -559,10 +586,15 @@ public void testMultipleMessagesPersistedCorrectlyInTx() throws Exception { m6.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); m6.setDurable(true); producer.send(m1); + clientSessionTxSends.commit(); producer.send(m2); + clientSessionTxSends.commit(); producer.send(m3); + clientSessionTxSends.commit(); producer.send(m4); + clientSessionTxSends.commit(); producer.send(m5); + clientSessionTxSends.commit(); producer.send(m6); clientSessionTxSends.commit(); clientSessionTxSends.start(); @@ -605,31 +637,37 @@ public void testMultipleAcksPersistedCorrectly() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m1", m.getBodyBuffer().readString()); producer.send(m2); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m2", m.getBodyBuffer().readString()); producer.send(m3); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m3", m.getBodyBuffer().readString()); producer.send(m4); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m4", m.getBodyBuffer().readString()); producer.send(m5); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m5", m.getBodyBuffer().readString()); producer.send(m6); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m6", m.getBodyBuffer().readString()); assertEquals(0, queue.getDeliveringCount()); @@ -646,6 +684,7 @@ public void testRemoveMessageThroughManagement() throws Exception { m1.putStringProperty(Message.HDR_LAST_VALUE_NAME, rh); m1.setDurable(true); producer.send(m1); + clientSession.commit(); queue.deleteAllReferences(); @@ -655,6 +694,7 @@ public void testRemoveMessageThroughManagement() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m1", m.getBodyBuffer().readString()); assertEquals(0, queue.getDeliveringCount()); @@ -713,11 +753,13 @@ public void testMultipleAcksPersistedCorrectly2() throws Exception { ClientMessage m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m1", m.getBodyBuffer().readString()); producer.send(m2); m = consumer.receive(1000); assertNotNull(m); m.acknowledge(); + clientSession.commit(); assertEquals("m2", m.getBodyBuffer().readString()); assertEquals(0, queue.getDeliveringCount()); @@ -956,9 +998,9 @@ public void setUp() throws Exception { ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0); ClientSessionFactory sf = createSessionFactory(locator); - clientSession = addClientSession(sf.createSession(false, true, true)); + clientSession = addClientSession(sf.createSession(false, true, false)); clientSessionTxReceives = addClientSession(sf.createSession(false, true, false)); - clientSessionTxSends = addClientSession(sf.createSession(false, false, true)); + clientSessionTxSends = addClientSession(sf.createSession(false, false, false)); clientSession.createQueue(QueueConfiguration.of(qName1).setAddress(address)); } }