Skip to content

Commit 4d746d4

Browse files
committed
fix: remove redundant awaitingFinalEvent clear and clarify counter logic
The awaitingFinalEvent flag is already cleared in internalEnqueueItem when the final event arrives in the MainQueue. The dequeue-side clearing in ChildQueue.dequeueEventItem was a redundant no-op on the normal path and has been removed to avoid misleading future maintainers. Also clarifies the pollTimeoutsWhileAwaitingFinal reset logic in EventConsumer by documenting all three cases of the if/else-if block: - awaitingFinal && queueSize == 0: increment counter, give up at MAX - awaitingFinal && queueSize > 0: implicit fall-through, do nothing - !awaitingFinal: reset counter (distinct from the successful-dequeue reset) Signed-off-by: Emmanuel Hugonnet <ehugonne@redhat.com>
1 parent 3f877a7 commit 4d746d4

1 file changed

Lines changed: 17 additions & 6 deletions

File tree

server-common/src/main/java/io/a2a/server/events/EventConsumer.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,14 @@ public class EventConsumer {
3131
private static final int QUEUE_WAIT_MILLISECONDS = 500;
3232
// In replicated scenarios, events can arrive hundreds of milliseconds after local agent completes
3333
// Grace period allows Kafka replication to deliver late-arriving events
34-
// 3 timeouts * 500ms = 1500ms grace period for replication delays
34+
// Calculation: MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED * QUEUE_WAIT_MILLISECONDS = 1500ms
3535
private static final int MAX_POLL_TIMEOUTS_AFTER_AGENT_COMPLETED = 3;
3636
// Maximum time to wait for final event when awaitingFinalEvent is set
3737
// If event doesn't arrive after this many timeouts, assume it won't arrive
38-
// 6 timeouts * 500ms = 3000ms maximum wait for final event arrival
39-
private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL = 6;
38+
// Calculation: MAX_POLL_TIMEOUTS_AWAITING_FINAL * QUEUE_WAIT_MILLISECONDS = 3000ms
39+
private static final int MAX_AWAITING_FINAL_TIMEOUT_MS = 3000;
40+
private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL =
41+
MAX_AWAITING_FINAL_TIMEOUT_MS / QUEUE_WAIT_MILLISECONDS;
4042

4143
public EventConsumer(EventQueue queue) {
4244
this.queue = queue;
@@ -108,7 +110,14 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
108110
// proceed with normal timeout logic to prevent infinite waiting.
109111
boolean isInterruptedState = lastSeenTaskState != null && lastSeenTaskState.isInterrupted();
110112

111-
// Track how long we've been waiting for the final event
113+
// Track how long we've been waiting for the final event.
114+
// Three cases for the awaiting counter:
115+
// awaitingFinal && queueSize == 0: final event enqueued in MainQueue but not yet
116+
// distributed here — increment timeout counter and give up after MAX timeout.
117+
// awaitingFinal && queueSize > 0: events are still in transit, do nothing —
118+
// the counter is reset below once an event is successfully dequeued.
119+
// !awaitingFinal: not waiting for anything — reset the counter (timeout case;
120+
// the successful-dequeue reset happens below at the event-received path).
112121
if (awaitingFinal && queueSize == 0) {
113122
pollTimeoutsWhileAwaitingFinal++;
114123
if (pollTimeoutsWhileAwaitingFinal >= MAX_POLL_TIMEOUTS_AWAITING_FINAL) {
@@ -118,8 +127,10 @@ public Flow.Publisher<EventQueueItem> consumeAll() {
118127
queue.clearAwaitingFinalEvent();
119128
awaitingFinal = false; // Also update local variable for this iteration
120129
}
121-
} else {
122-
pollTimeoutsWhileAwaitingFinal = 0; // Reset when event arrives or queue not awaiting
130+
} else if (!awaitingFinal) {
131+
// Poll timed out and we are not awaiting a final event: reset the counter.
132+
// (The successful-dequeue reset is handled separately below.)
133+
pollTimeoutsWhileAwaitingFinal = 0;
123134
}
124135

125136
if (agentCompleted && queueSize == 0 && !isInterruptedState && !awaitingFinal) {

0 commit comments

Comments
 (0)