-
Notifications
You must be signed in to change notification settings - Fork 0
Description
Summary
- Context:
SseSupportis a utility component used byChatControllerandGuidedLearningControllerto prepare and multicast Server-Sent Event (SSE) streams for LLM responses. - Bug: The
prepareDataStreammethod usespublish().autoConnect(2)to handle multiple internal subscribers, which incorrectly traps cancellation signals and prevents them from reaching the upstream source. - Actual vs. expected: When a client disconnects (e.g., by closing a browser tab), the upstream LLM streaming request continues to completion in the background. It should instead propagate the cancellation to the LLM provider to stop token consumption and free up server resources.
- Impact: This leads to a resource leak and unnecessary costs from the LLM provider, as "zombie" requests continue to stream and process data that will never be delivered or saved.
Code with bug
public Flux<String> prepareDataStream(Flux<String> source, Consumer<String> chunkConsumer) {
return source.filter(chunk -> chunk != null && !chunk.isEmpty())
.bufferTimeout(STREAM_CHUNK_COALESCE_MAX_ITEMS, Duration.ofMillis(STREAM_CHUNK_COALESCE_WINDOW_MS))
.filter(chunkBatch -> !chunkBatch.isEmpty())
.map(chunkBatch -> String.join("", chunkBatch))
.doOnNext(chunk -> chunkConsumer.accept(chunk))
.onBackpressureBuffer(
STREAM_BACKPRESSURE_BUFFER_CAPACITY,
this::recordDroppedCoalescedChunk,
BufferOverflowStrategy.DROP_OLDEST)
// Two subscribers consume this stream in controllers:
// 1) text event emission, 2) heartbeat termination signal.
// autoConnect(2) prevents a race where one subscriber could miss the first chunks.
.publish()
.autoConnect(2); // <-- BUG 🔴 [Cancellation from downstream is ignored by autoConnect]
}Evidence
1. Verification of autoConnect Cancellation Behavior
A test case confirms that autoConnect(n) does not propagate cancellation to the upstream source when all subscribers have disposed of their subscriptions, unlike refCount(n).
@Test
void testAutoConnectDoesNotCancelUpstream() {
AtomicBoolean cancelled = new AtomicBoolean(false);
Sinks.Many<Integer> sink = Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> source = sink.asFlux()
.doOnCancel(() -> cancelled.set(true));
Flux<Integer> shared = source.publish().autoConnect(2);
var sub1 = shared.subscribe();
var sub2 = shared.subscribe();
sub1.dispose();
sub2.dispose();
// Verification fails if we expect cancellation
assertFalse(cancelled.get(), "Upstream should NOT be cancelled with autoConnect");
}2. Analysis of Chat Termination Logic
In ChatController, the fullResponse is accumulated via a chunkConsumer passed to prepareDataStream. This consumer is called inside a doOnNext operator located before the publish() point.
When a client disconnects:
- The HTTP response Flux is cancelled.
- The
mergeoperator in the controller cancelsdataEventsandheartbeats. - These subscribers disconnect from the
autoConnectFlux. - Because
autoConnectis used, the connection to thesource(the LLM stream) remains active. doOnNextcontinues to fire, appending chunks tofullResponseand consuming LLM tokens.- The final
doOnCompleteblock inChatControlleris never reached because the main chain was cancelled, so the accumulatedfullResponseis discarded instead of being saved to memory, despite the full cost of the request being paid.
Why has this bug gone undetected?
This bug is silent and does not cause visible errors in the UI. In a development environment, the extra few seconds of background processing and the slightly higher LLM token usage are easily overlooked. Additionally, if a user cancels a request, they typically don't check if the partial message was saved to their history, or they assume it wasn't saved because they cancelled it.
Recommended fix
Replace autoConnect(2) with refCount(2) (or refCount(2, Duration.ZERO) for immediate cleanup). refCount(2) provides the same protection against the "first chunks" race by waiting for 2 subscribers before connecting, but it correctly propagates cancellation upstream when the subscriber count drops to zero.
.publish()
.refCount(2); // <-- FIX 🟢 [Correctly propagates cancellation to upstream source]