From 307bbfd979d0a615b437784aeeb0441041860284 Mon Sep 17 00:00:00 2001
From: Javier Aliaga
Date: Tue, 14 Apr 2026 11:18:20 +0200
Subject: [PATCH 1/3] fix: detect shutdown and break DurableTaskGrpcWorker loop
The worker loop ran `while (true)` and only exited if an
InterruptedException happened during the 5-second retry sleep.
If `close()` was called while the gRPC stream was blocking, the
CANCELLED exception was logged but the loop kept retrying.
- Replace `while (true)` with a check on `isNormalShutdown` and
the thread interrupt flag so the loop exits promptly.
- Break out of the retry path on CANCELLED when `isNormalShutdown`
is set, avoiding a misleading 5-second sleep after `close()`.
- Re-set the interrupt flag before breaking on InterruptedException
to preserve the interrupt contract for callers higher up.
Signed-off-by: Javier Aliaga
---
.../java/io/dapr/durabletask/DurableTaskGrpcWorker.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
index 12f89bd727..d4db1a4cc6 100644
--- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
+++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
@@ -171,7 +171,7 @@ public void startAndBlock() {
this.dataConverter,
logger);
- while (true) {
+ while (!this.isNormalShutdown && !Thread.currentThread().isInterrupted()) {
try {
OrchestratorService.GetWorkItemsRequest getWorkItemsRequest = OrchestratorService.GetWorkItemsRequest
.newBuilder().build();
@@ -210,6 +210,9 @@ public void startAndBlock() {
this.getSidecarAddress());
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress());
+ if (this.isNormalShutdown) {
+ break;
+ }
} else {
logger.log(Level.WARNING,
String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e);
@@ -219,6 +222,7 @@ public void startAndBlock() {
try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
break;
}
}
From e00a33e2cc6e67377278df1848bf11f6739f0852 Mon Sep 17 00:00:00 2001
From: Javier Aliaga
Date: Tue, 14 Apr 2026 12:27:09 +0200
Subject: [PATCH 2/3] =?UTF-8?q?fix:=20address=20PR=20review=20=E2=80=94=20?=
=?UTF-8?q?skip=20retry=20sleep=20on=20shutdown,=20add=20tests?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- Capture workerThread in startAndBlock() so close() can interrupt
the thread even when startAndBlock() is called directly (not via
start()), fixing the case where the 5s sleep blocks shutdown.
- Add isNormalShutdown guard before the retry sleep so any exception
code (UNAVAILABLE, CANCELLED, etc.) exits promptly during shutdown.
- Add DurableTaskGrpcWorkerShutdownTest with 3 scenarios:
- start() + close() terminates the worker thread promptly
- startAndBlock() on a separate thread exits on close()
- startAndBlock() exits on thread interrupt
Signed-off-by: Javier Aliaga
---
.../durabletask/DurableTaskGrpcWorker.java | 5 +
.../DurableTaskGrpcWorkerShutdownTest.java | 128 ++++++++++++++++++
2 files changed, 133 insertions(+)
create mode 100644 durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
index d4db1a4cc6..2ebc7de476 100644
--- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
+++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
@@ -158,6 +158,7 @@ public void close() {
* interrupt signal.
*/
public void startAndBlock() {
+ this.workerThread = Thread.currentThread();
logger.log(Level.INFO, "Durable Task worker is connecting to sidecar at {0}.", this.getSidecarAddress());
TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
@@ -218,6 +219,10 @@ public void startAndBlock() {
String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e);
}
+ if (this.isNormalShutdown) {
+ break;
+ }
+
// Retry after 5 seconds
try {
Thread.sleep(5000);
diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
new file mode 100644
index 0000000000..aa7a45606f
--- /dev/null
+++ b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
@@ -0,0 +1,128 @@
+/*
+ * Copyright 2025 The Dapr Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package io.dapr.durabletask;
+
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Unit tests for DurableTaskGrpcWorker shutdown behavior.
+ */
+public class DurableTaskGrpcWorkerShutdownTest {
+
+ /**
+ * Verifies that calling close() on a worker that was started via start()
+ * causes the worker thread to terminate promptly (within a bounded time),
+ * rather than hanging in the retry loop.
+ */
+ @Test
+ void workerThreadTerminatesPromptlyOnClose() throws Exception {
+ // Use an arbitrary port where no sidecar is running — the worker will
+ // enter the retry loop (UNAVAILABLE → sleep 5s → retry).
+ DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder()
+ .port(19876)
+ .build();
+
+ worker.start();
+
+ // Give the worker thread time to enter the retry loop
+ Thread.sleep(500);
+
+ Instant before = Instant.now();
+ worker.close();
+
+ // Wait for the worker thread to finish — the join is bounded so the
+ // test doesn't hang if the fix regresses.
+ Thread workerThread = getWorkerThread(worker);
+ if (workerThread != null) {
+ workerThread.join(Duration.ofSeconds(3).toMillis());
+ assertFalse(workerThread.isAlive(),
+ "Worker thread should have terminated after close()");
+ }
+
+ Duration elapsed = Duration.between(before, Instant.now());
+ assertTrue(elapsed.toMillis() < 3000,
+ "close() should return promptly, but took " + elapsed.toMillis() + "ms");
+ }
+
+ /**
+ * Verifies that calling close() on a worker that was started via
+ * startAndBlock() on a separate thread terminates that thread promptly.
+ */
+ @Test
+ void startAndBlockExitsOnClose() throws Exception {
+ DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder()
+ .port(19877)
+ .build();
+
+ Thread blockingThread = new Thread(worker::startAndBlock);
+ blockingThread.start();
+
+ // Give the blocking thread time to enter the retry loop
+ Thread.sleep(500);
+
+ Instant before = Instant.now();
+ worker.close();
+
+ blockingThread.join(Duration.ofSeconds(3).toMillis());
+ assertFalse(blockingThread.isAlive(),
+ "startAndBlock() thread should have terminated after close()");
+
+ Duration elapsed = Duration.between(before, Instant.now());
+ assertTrue(elapsed.toMillis() < 3000,
+ "close() should terminate startAndBlock() promptly, but took " + elapsed.toMillis() + "ms");
+ }
+
+ /**
+ * Verifies that interrupting the thread running startAndBlock() causes it
+ * to exit and preserves the interrupt status.
+ */
+ @Test
+ void startAndBlockExitsOnInterrupt() throws Exception {
+ DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder()
+ .port(19878)
+ .build();
+
+ Thread blockingThread = new Thread(worker::startAndBlock);
+ blockingThread.start();
+
+ // Give the blocking thread time to enter the retry loop
+ Thread.sleep(500);
+
+ blockingThread.interrupt();
+ blockingThread.join(Duration.ofSeconds(3).toMillis());
+
+ assertFalse(blockingThread.isAlive(),
+ "startAndBlock() thread should have exited after interrupt");
+
+ worker.close();
+ }
+
+ private Thread getWorkerThread(DurableTaskGrpcWorker worker) {
+ try {
+ java.lang.reflect.Field f = DurableTaskGrpcWorker.class.getDeclaredField("workerThread");
+ f.setAccessible(true);
+ return (Thread) f.get(worker);
+ } catch (Exception e) {
+ return null;
+ }
+ }
+}
From 04b7a117759101ad05b0a385e3ceac1393f55e36 Mon Sep 17 00:00:00 2001
From: Javier Aliaga
Date: Tue, 14 Apr 2026 15:33:47 +0200
Subject: [PATCH 3/3] fix: address second round of PR review comments
- Mark workerThread as volatile for cross-thread visibility
- Remove unused imports (ManagedChannel, ManagedChannelBuilder)
- Fail test explicitly when reflection fails instead of silently
returning null
- Assert interrupt status is preserved in startAndBlockExitsOnInterrupt
Signed-off-by: Javier Aliaga
---
.../durabletask/DurableTaskGrpcWorker.java | 8 ++++----
.../DurableTaskGrpcWorkerShutdownTest.java | 18 ++++++++++--------
2 files changed, 14 insertions(+), 12 deletions(-)
diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
index 2ebc7de476..89fe85c8af 100644
--- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
+++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
@@ -64,7 +64,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient;
private final boolean isExecutorServiceManaged;
private volatile boolean isNormalShutdown = false;
- private Thread workerThread;
+ private volatile Thread workerThread;
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
this.orchestrationFactories = builder.orchestrationFactories;
@@ -178,6 +178,9 @@ public void startAndBlock() {
.newBuilder().build();
Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
+ if (this.isNormalShutdown || Thread.currentThread().isInterrupted()) {
+ break;
+ }
OrchestratorService.WorkItem workItem = workItemStream.next();
OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase();
@@ -211,9 +214,6 @@ public void startAndBlock() {
this.getSidecarAddress());
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress());
- if (this.isNormalShutdown) {
- break;
- }
} else {
logger.log(Level.WARNING,
String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e);
diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
index aa7a45606f..71368af4be 100644
--- a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
+++ b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
@@ -13,15 +13,15 @@
package io.dapr.durabletask;
-import io.grpc.ManagedChannel;
-import io.grpc.ManagedChannelBuilder;
import org.junit.jupiter.api.Test;
import java.time.Duration;
import java.time.Instant;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
/**
* Unit tests for DurableTaskGrpcWorker shutdown behavior.
@@ -52,11 +52,10 @@ void workerThreadTerminatesPromptlyOnClose() throws Exception {
// Wait for the worker thread to finish — the join is bounded so the
// test doesn't hang if the fix regresses.
Thread workerThread = getWorkerThread(worker);
- if (workerThread != null) {
- workerThread.join(Duration.ofSeconds(3).toMillis());
- assertFalse(workerThread.isAlive(),
- "Worker thread should have terminated after close()");
- }
+ assertNotNull(workerThread, "Worker thread should be accessible via reflection");
+ workerThread.join(Duration.ofSeconds(3).toMillis());
+ assertFalse(workerThread.isAlive(),
+ "Worker thread should have terminated after close()");
Duration elapsed = Duration.between(before, Instant.now());
assertTrue(elapsed.toMillis() < 3000,
@@ -112,6 +111,8 @@ void startAndBlockExitsOnInterrupt() throws Exception {
assertFalse(blockingThread.isAlive(),
"startAndBlock() thread should have exited after interrupt");
+ assertTrue(blockingThread.isInterrupted(),
+ "Interrupt status should be preserved after startAndBlock() exits");
worker.close();
}
@@ -122,7 +123,8 @@ private Thread getWorkerThread(DurableTaskGrpcWorker worker) {
f.setAccessible(true);
return (Thread) f.get(worker);
} catch (Exception e) {
- return null;
+ fail("Failed to access workerThread field via reflection: " + e.getMessage());
+ return null; // unreachable
}
}
}