diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
index 12f89bd727..89fe85c8af 100644
--- a/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
+++ b/durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java
@@ -64,7 +64,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient;
private final boolean isExecutorServiceManaged;
private volatile boolean isNormalShutdown = false;
- private Thread workerThread;
+ private volatile Thread workerThread;
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
this.orchestrationFactories = builder.orchestrationFactories;
@@ -158,6 +158,7 @@ public void close() {
* interrupt signal.
*/
public void startAndBlock() {
+ this.workerThread = Thread.currentThread();
logger.log(Level.INFO, "Durable Task worker is connecting to sidecar at {0}.", this.getSidecarAddress());
TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
@@ -171,12 +172,15 @@ public void startAndBlock() {
this.dataConverter,
logger);
- while (true) {
+ while (!this.isNormalShutdown && !Thread.currentThread().isInterrupted()) {
try {
OrchestratorService.GetWorkItemsRequest getWorkItemsRequest = OrchestratorService.GetWorkItemsRequest
.newBuilder().build();
Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
+ if (this.isNormalShutdown || Thread.currentThread().isInterrupted()) {
+ break;
+ }
OrchestratorService.WorkItem workItem = workItemStream.next();
OrchestratorService.WorkItem.RequestCase requestType = workItem.getRequestCase();
@@ -215,10 +219,15 @@ public void startAndBlock() {
String.format("Unexpected failure connecting to %s", this.getSidecarAddress()), e);
}
+ if (this.isNormalShutdown) {
+ break;
+ }
+
// Retry after 5 seconds
try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
break;
}
}
diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
new file mode 100644
index 0000000000..71368af4be
--- /dev/null
+++ b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskGrpcWorkerShutdownTest.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2025 The Dapr Authors
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package io.dapr.durabletask;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.time.Instant;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Unit tests for DurableTaskGrpcWorker shutdown behavior.
+ */
+public class DurableTaskGrpcWorkerShutdownTest {
+
+ /**
+ * Verifies that calling close() on a worker that was started via start()
+ * causes the worker thread to terminate promptly (within a bounded time),
+ * rather than hanging in the retry loop.
+ */
+ @Test
+ void workerThreadTerminatesPromptlyOnClose() throws Exception {
+ // Use an arbitrary port where no sidecar is running — the worker will
+ // enter the retry loop (UNAVAILABLE → sleep 5s → retry).
+ DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder()
+ .port(19876)
+ .build();
+
+ worker.start();
+
+ // Give the worker thread time to enter the retry loop
+ Thread.sleep(500);
+
+ Instant before = Instant.now();
+ worker.close();
+
+ // Wait for the worker thread to finish — the join is bounded so the
+ // test doesn't hang if the fix regresses.
+ Thread workerThread = getWorkerThread(worker);
+ assertNotNull(workerThread, "Worker thread should be accessible via reflection");
+ workerThread.join(Duration.ofSeconds(3).toMillis());
+ assertFalse(workerThread.isAlive(),
+ "Worker thread should have terminated after close()");
+
+ Duration elapsed = Duration.between(before, Instant.now());
+ assertTrue(elapsed.toMillis() < 3000,
+ "close() should return promptly, but took " + elapsed.toMillis() + "ms");
+ }
+
+ /**
+ * Verifies that calling close() on a worker that was started via
+ * startAndBlock() on a separate thread terminates that thread promptly.
+ */
+ @Test
+ void startAndBlockExitsOnClose() throws Exception {
+ DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder()
+ .port(19877)
+ .build();
+
+ Thread blockingThread = new Thread(worker::startAndBlock);
+ blockingThread.start();
+
+ // Give the blocking thread time to enter the retry loop
+ Thread.sleep(500);
+
+ Instant before = Instant.now();
+ worker.close();
+
+ blockingThread.join(Duration.ofSeconds(3).toMillis());
+ assertFalse(blockingThread.isAlive(),
+ "startAndBlock() thread should have terminated after close()");
+
+ Duration elapsed = Duration.between(before, Instant.now());
+ assertTrue(elapsed.toMillis() < 3000,
+ "close() should terminate startAndBlock() promptly, but took " + elapsed.toMillis() + "ms");
+ }
+
+ /**
+ * Verifies that interrupting the thread running startAndBlock() causes it
+ * to exit and preserves the interrupt status.
+ */
+ @Test
+ void startAndBlockExitsOnInterrupt() throws Exception {
+ DurableTaskGrpcWorker worker = new DurableTaskGrpcWorkerBuilder()
+ .port(19878)
+ .build();
+
+ Thread blockingThread = new Thread(worker::startAndBlock);
+ blockingThread.start();
+
+ // Give the blocking thread time to enter the retry loop
+ Thread.sleep(500);
+
+ blockingThread.interrupt();
+ blockingThread.join(Duration.ofSeconds(3).toMillis());
+
+ assertFalse(blockingThread.isAlive(),
+ "startAndBlock() thread should have exited after interrupt");
+ assertTrue(blockingThread.isInterrupted(),
+ "Interrupt status should be preserved after startAndBlock() exits");
+
+ worker.close();
+ }
+
+ private Thread getWorkerThread(DurableTaskGrpcWorker worker) {
+ try {
+ java.lang.reflect.Field f = DurableTaskGrpcWorker.class.getDeclaredField("workerThread");
+ f.setAccessible(true);
+ return (Thread) f.get(worker);
+ } catch (Exception e) {
+ fail("Failed to access workerThread field via reflection: " + e.getMessage());
+ return null; // unreachable
+ }
+ }
+}