Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public String getValidatedUser() {
}

public void destroy() {
handler.runLater(() -> connectionCallback.close());
handler.runNow(() -> connectionCallback.close());
}

public boolean isSyncOnFlush() {
Expand Down Expand Up @@ -806,7 +806,7 @@ public void onTransportError(Transport transport) throws Exception {
transport.getCondition().getDescription() : "Unknown Internal Error";

// Cleanup later after the any pending work gets sent to the remote via an IO flush.
runLater(() -> connectionCallback.getProtonConnectionDelegate().fail(new ActiveMQAMQPInternalErrorException(errorMessage)));
runNow(() -> connectionCallback.getProtonConnectionDelegate().fail(new ActiveMQAMQPInternalErrorException(errorMessage)));
}

@Override
Expand Down Expand Up @@ -872,7 +872,7 @@ public void onRemoteOpen(Session session) throws Exception {

@Override
public void onRemoteClose(Session session) throws Exception {
handler.runLater(() -> {
handler.runNow(() -> {
session.close();
session.free();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void writeBytes(MessageReference messageReference) {
}

if (closed) {
throw new IllegalStateException("Cannot write to an AMQP Large Message Writer that has been closed");
return;
}

if (sessionSPI.invokeOutgoing(message, (ActiveMQProtonRemotingConnection) sessionSPI.getTransportConnection().getProtocolConnection()) != null) {
Expand All @@ -173,6 +173,11 @@ private void tryDelivering() {
logger.trace("AMQP Large Message Writer was closed before queued write attempt was executed");
return;
}
if (protonSender.getSession().getConnection().getRemoteState() != EndpointState.ACTIVE || protonSender.getSession().getConnection().getLocalState() != EndpointState.ACTIVE) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code actually already has several checks for closed state related to the sender but in reality even this is not a guarantee that you won't see this error as the proton code was not meant to be used outside the thread that manages the engine so the visibility of these values is not ensured to be updated in this delivering thread.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens is this:

We need to check the isClosed from the session / connection / link somehow. or we need to make the onRemoteClose to also close the Writer.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just amended to another possible fix..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW: The thread being used here is the netty thread, as this is started with connection.runLater calls. so this is in the same thread.

logger.debug("connection was closed either remotely or locally, the delivery cannot be completed");
return;
}


// This is discounting some bytes due to Transfer payload
final int frameSize = protonSender.getSession().getConnection().getTransport().getOutboundFrameSizeLimit() - 50 - (delivery.getTag() != null ? delivery.getTag().length : 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
// MessageReferences are sent to the Connection executor (Netty Loop)
// as a result the returning references have to be done later after they
// had their chance to finish and clear the runnable
connection.runLater(() -> {
connection.runNow(() -> {
try {
protonSession.removeSender(sender);
if (brokerConsumer != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ public void testLargeMessageUsageLoweredOnCloseWhenWriteNotCompleted() throws Ex

verify(message).usageDown();
verify(protonSender).getSession();
verify(protonDelivery).getTag();
verify(protonSender, atLeastOnce()).getLocalState();

verifyNoMoreInteractions(reference);
Expand Down Expand Up @@ -246,7 +245,6 @@ public void testTryDeliveringRunAfterClosedDoesNotThrow() throws Exception {
verify(message).usageDown();
verify(reference).getMessage();
verify(protonSender).getSession();
verify(protonDelivery).getTag();
verify(protonSender, atLeastOnce()).getLocalState();

verifyNoMoreInteractions(reference);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.jms.BytesMessage;
Expand All @@ -51,6 +55,7 @@
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
Expand Down Expand Up @@ -83,12 +88,14 @@
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.ExtendWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.atomic.AtomicInteger;

@ExtendWith(ParameterizedTestExtension.class)
public class AmqpLargeMessageTest extends AmqpClientTestSupport {
Expand Down Expand Up @@ -1177,6 +1184,147 @@ public void testSimpleLargeMessageRestart() throws Exception {
}


@Test
public void testInterruptStreaming() throws Exception {
ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:61616");

final int MESSAGE_COUNT = 20;
final int MESSAGE_SIZE = 1500000; // 1.5 MB
final int THREADS = 10;
byte[] payload = createLargePayload(MESSAGE_SIZE);

try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler(true)) {

final CyclicBarrier consumersReady = new CyclicBarrier(THREADS + 1);
final CountDownLatch consumersDone = new CountDownLatch(THREADS);
final CountDownLatch producersDone = new CountDownLatch(THREADS);
final AtomicInteger sentCount = new AtomicInteger(0);
final AtomicInteger countError = new AtomicInteger(0);

ExecutorService executorService = Executors.newFixedThreadPool(THREADS * 2);
runAfter(executorService::shutdownNow);

for (int i = 0; i < THREADS; i++) {
final int threadId = i;
executorService.execute(() -> {
try {
consumeForInterruptedStreaming(factory, threadId, consumersReady, MESSAGE_COUNT, payload);
} catch (Throwable error) {
countError.incrementAndGet();
logger.warn(error.getMessage(), error);
} finally {
consumersDone.countDown();
}
});
}

consumersReady.await(30, TimeUnit.SECONDS);

for (int i = 0; i < THREADS; i++) {
final int threadId = i;
executorService.execute(() -> {
try {
produceForInterruptedStreaming(factory, sentCount, MESSAGE_COUNT, payload);
} catch (Throwable e) {
countError.incrementAndGet();
logger.warn(e.getMessage(), e);
} finally {
producersDone.countDown();
}
});
}

assertTrue(consumersDone.await(5, TimeUnit.MINUTES));
assertTrue(producersDone.await(5, TimeUnit.MINUTES));

assertEquals(0, countError.get(), "There are exceptions on the producers or consumers");

assertFalse(loggerHandler.findTrace("IllegalArgumentException"), "Server log contains IllegalArgumentException");
}
}

private static void produceForInterruptedStreaming(ConnectionFactory factory, AtomicInteger sentCount, int messageCount, byte[] payload) throws Throwable {
Connection connection = null;
try {
connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("testTopic");
MessageProducer producer = session.createProducer(topic);


for (int j = 0; j < messageCount; j++) {
BytesMessage message = session.createBytesMessage();
message.writeBytes(payload);
producer.send(message);
sentCount.incrementAndGet();

if ((j + 1) % 10 == 0) {
session.commit();
}
}
session.commit();
} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
logger.error("Error closing producer connection", e);
}
}
}
}


private static void consumeForInterruptedStreaming(ConnectionFactory factory,
int threadId,
CyclicBarrier startFlag, int messageCount, byte[] expectedPayload) throws Exception {

Connection connection = null;
try {
connection = factory.createConnection();
connection.setClientID("consumer-" + threadId);

Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic("testTopic");

// Create durable subscriber
MessageConsumer consumer = session.createDurableSubscriber(topic, "sub-" + threadId);

startFlag.await(10, TimeUnit.SECONDS);
connection.start();

for (int i = 0; i < messageCount; i++) {
BytesMessage msg = (BytesMessage) consumer.receive(TimeUnit.SECONDS.toMillis(30));
assertNotNull(msg);

int bodySize = (int)msg.getBodyLength();

assertEquals(bodySize, expectedPayload.length);

byte[] receivedPayLoad = new byte[bodySize];
msg.readBytes(receivedPayLoad);

assertArrayEquals(expectedPayload, receivedPayLoad);

if (i % 10 == 0) {
session.commit();
}
}
session.commit();

} finally {
if (connection != null) {
try {
connection.close();
} catch (Exception e) {
logger.error("Error closing consumer connection", e);
}
}
}
}



private void sendObjectMessages(int nMsgs, ConnectionFactory factory) throws Exception {
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession();
Expand Down
Loading