-
Notifications
You must be signed in to change notification settings - Fork 55
Fix Continue-as-new Race Condition at InProcessTestHost #703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -815,7 +815,8 @@ async Task<GrpcOrchestratorExecutionResult> ITaskExecutor.ExecuteOrchestrator( | |
| totalBytes += ev.CalculateSize(); | ||
| } | ||
|
|
||
| if (this.supportsHistoryStreaming && totalBytes > (1024)) | ||
| const int HistoryStreamingThresholdBytes = 1024 * 1024; // 1 MiB | ||
| if (this.supportsHistoryStreaming && totalBytes > HistoryStreamingThresholdBytes) | ||
| { | ||
| orkRequest.RequiresHistoryStreaming = true; | ||
| // Store past events to serve via StreamInstanceHistory | ||
|
|
@@ -901,8 +902,7 @@ async Task SendWorkItemToClientAsync(P.WorkItem workItem) | |
| lock (this.isConnectedSignal) | ||
| { | ||
| outputStream = this.workerToClientStream ?? | ||
| // CA2201: Use specific exception types | ||
| throw new InvalidOperationException("No client is connected. Need to wait until a client connects before executing."); | ||
| throw new RpcException(new Status(StatusCode.Unavailable, "No client is connected.")); | ||
| } | ||
|
|
||
| // The gRPC channel can only handle one message at a time, so we need to serialize access to it. | ||
|
|
@@ -911,6 +911,22 @@ async Task SendWorkItemToClientAsync(P.WorkItem workItem) | |
| { | ||
| await outputStream.WriteAsync(workItem); | ||
| } | ||
| catch (InvalidOperationException ex) when (ex.Message.Contains("request is complete", StringComparison.OrdinalIgnoreCase)) | ||
| { | ||
| // The client disconnected or canceled the GetWorkItems stream. | ||
| // Reset the connection state so the dispatcher pauses naturally | ||
| // (via the traffic signal) until a new client connects. | ||
| lock (this.isConnectedSignal) | ||
| { | ||
| this.workerToClientStream = null; | ||
| this.isConnectedSignal.Reset(); | ||
| } | ||
|
|
||
| // Must throw so callers (ExecuteOrchestrator/ExecuteActivity) can clean up | ||
| // their pending TCS. The dispatcher catches this, abandons the work item, | ||
| // and releases it back to the queue for retry. | ||
| throw new OperationCanceledException("Work-item stream closed by client.", ex); | ||
| } | ||
|
Comment on lines
+914
to
+929
|
||
| finally | ||
| { | ||
| this.sendWorkItemLock.Release(); | ||
|
|
@@ -919,7 +935,7 @@ async Task SendWorkItemToClientAsync(P.WorkItem workItem) | |
|
|
||
| TaskCompletionSource<GrpcOrchestratorExecutionResult> CreateTaskCompletionSourceForOrchestrator(string instanceId) | ||
| { | ||
| TaskCompletionSource<GrpcOrchestratorExecutionResult> tcs = new(); | ||
| TaskCompletionSource<GrpcOrchestratorExecutionResult> tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); | ||
| this.pendingOrchestratorTasks.TryAdd(instanceId, tcs); | ||
| return tcs; | ||
| } | ||
|
|
@@ -933,7 +949,7 @@ void RemoveOrchestratorTaskCompletionSource(string instanceId) | |
| TaskCompletionSource<ActivityExecutionResult> CreateTaskCompletionSourceForActivity(string instanceId, int taskId) | ||
| { | ||
| string taskIdKey = GetTaskIdKey(instanceId, taskId); | ||
| TaskCompletionSource<ActivityExecutionResult> tcs = new(); | ||
| TaskCompletionSource<ActivityExecutionResult> tcs = new(TaskCreationOptions.RunContinuationsAsynchronously); | ||
| this.pendingActivityTasks.TryAdd(taskIdKey, tcs); | ||
| return tcs; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,256 @@ | ||
| // Copyright (c) Microsoft Corporation. | ||
| // Licensed under the MIT License. | ||
|
|
||
| using Microsoft.DurableTask; | ||
| using Microsoft.DurableTask.Client; | ||
| using Microsoft.DurableTask.Testing; | ||
| using Microsoft.DurableTask.Worker; | ||
| using Xunit; | ||
| using Xunit.Abstractions; | ||
|
|
||
|
bachuv marked this conversation as resolved.
|
||
| namespace InProcessTestHost.Tests; | ||
|
|
||
| /// <summary> | ||
| /// Tests that orchestrations using ContinueAsNew with activity calls and timers | ||
| /// resume correctly after each iteration and eventually complete. | ||
| /// </summary> | ||
| public class ContinueAsNewTests | ||
| { | ||
| readonly ITestOutputHelper output; | ||
|
|
||
| public ContinueAsNewTests(ITestOutputHelper output) | ||
| { | ||
| this.output = output; | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Registers a polling-style orchestrator and a stateful activity. | ||
| /// | ||
| /// TestOrchestrator: | ||
| /// 1. Calls TestActivity with current state | ||
| /// 2. Updates state from activity result | ||
| /// 3. If not closed: waits 1s timer, then ContinueAsNew with updated state | ||
| /// 4. If closed: orchestration ends | ||
| /// | ||
| /// TestActivity: | ||
| /// - First call: sets status to InProgress | ||
| /// - Second call: sets status to Succeeded (triggers orchestration completion) | ||
| /// </summary> | ||
| static void RegisterTestFunctions(DurableTaskRegistry tasks) | ||
| { | ||
| tasks.AddOrchestratorFunc<AsyncOperation>("TestOrchestrator", async (context, input) => | ||
| { | ||
| var result = await context.CallActivityAsync<AsyncOperation>("TestActivity", input); | ||
| input.Update(result); | ||
|
|
||
| if (!input.Closed) | ||
| { | ||
| await context.CreateTimer(TimeSpan.FromSeconds(1), CancellationToken.None); | ||
| context.ContinueAsNew(input); | ||
| return; | ||
| } | ||
|
bachuv marked this conversation as resolved.
|
||
| }); | ||
|
|
||
| tasks.AddActivityFunc<AsyncOperation, AsyncOperation>("TestActivity", (context, input) => | ||
| { | ||
| if (input.Status == Status.InProgress) | ||
| { | ||
| input.Status = Status.Succeeded; | ||
| } | ||
| else | ||
| { | ||
| input.Status = Status.InProgress; | ||
| } | ||
|
bachuv marked this conversation as resolved.
Dismissed
|
||
|
|
||
| return input; | ||
| }); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Verifies that a single orchestration calling an activity, waiting on a timer, | ||
| /// and then using ContinueAsNew completes after 2 iterations without hanging. | ||
| /// Covers the basic ContinueAsNew lifecycle: activity call -> state update -> | ||
| /// timer -> ContinueAsNew -> activity call -> completion. | ||
| /// </summary> | ||
| [Fact(Timeout = 30_000)] | ||
| public async Task Orchestration_ContinueAsNew_WithActivityAndTimer_CompletesSuccessfully() | ||
| { | ||
| await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(RegisterTestFunctions); | ||
|
|
||
| var input = new AsyncOperation { Status = Status.NotStarted, Closed = false, IterationCount = 0 }; | ||
| string instanceId = await host.Client.ScheduleNewOrchestrationInstanceAsync("TestOrchestrator", input); | ||
|
|
||
| using CancellationTokenSource cts = new(TimeSpan.FromSeconds(20)); | ||
| OrchestrationMetadata metadata = await host.Client.WaitForInstanceCompletionAsync( | ||
| instanceId, getInputsAndOutputs: true, cts.Token); | ||
|
|
||
| Assert.NotNull(metadata); | ||
| Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Runs 10 ContinueAsNew orchestrations in parallel, repeated across 3 rounds | ||
| /// with a fresh host each round. Validates that activity completion messages | ||
| /// are correctly delivered under contention when many orchestrations compete | ||
| /// for the dispatcher, ready-to-run queue, and instance locks simultaneously. | ||
| /// Total: 30 orchestration instances across 3 rounds. | ||
| /// </summary> | ||
| [Fact(Timeout = 60_000)] | ||
| public async Task ConcurrentOrchestrations_ContinueAsNew_AllComplete() | ||
| { | ||
| const int orchestrationCount = 10; | ||
| const int rounds = 3; | ||
|
|
||
| for (int round = 0; round < rounds; round++) | ||
| { | ||
| this.output.WriteLine($"=== Round {round + 1}/{rounds} ==="); | ||
|
|
||
| await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(RegisterTestFunctions); | ||
|
|
||
| string[] instanceIds = new string[orchestrationCount]; | ||
| for (int i = 0; i < orchestrationCount; i++) | ||
| { | ||
| var input = new AsyncOperation { Status = Status.NotStarted, Closed = false, IterationCount = 0 }; | ||
| instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync("TestOrchestrator", input); | ||
| } | ||
|
|
||
| using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30)); | ||
|
|
||
| Task<OrchestrationMetadata>[] waitTasks = instanceIds | ||
| .Select(id => host.Client.WaitForInstanceCompletionAsync( | ||
| id, getInputsAndOutputs: true, cts.Token)) | ||
| .ToArray(); | ||
|
|
||
| OrchestrationMetadata[] results = await Task.WhenAll(waitTasks); | ||
|
|
||
| for (int i = 0; i < orchestrationCount; i++) | ||
| { | ||
| Assert.NotNull(results[i]); | ||
| Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus); | ||
| } | ||
|
|
||
| this.output.WriteLine($"Round {round + 1}: all {orchestrationCount} completed"); | ||
| } | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Schedules 20 ContinueAsNew orchestrations on a single host as fast as possible. | ||
| /// All 20 share the same dispatcher and ready-to-run queue for their entire lifecycle, | ||
| /// maximizing interleaving of activity completion messages and ContinueAsNew | ||
| /// re-schedules across instances. | ||
| /// </summary> | ||
| [Fact(Timeout = 60_000)] | ||
| public async Task RapidFire_SequentialScheduling_ContinueAsNew_AllComplete() | ||
| { | ||
| const int orchestrationCount = 20; | ||
|
|
||
| await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(RegisterTestFunctions); | ||
|
|
||
| string[] instanceIds = new string[orchestrationCount]; | ||
| for (int i = 0; i < orchestrationCount; i++) | ||
| { | ||
| var input = new AsyncOperation { Status = Status.NotStarted, Closed = false, IterationCount = 0 }; | ||
| instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync("TestOrchestrator", input); | ||
| } | ||
|
|
||
| this.output.WriteLine($"Scheduled {orchestrationCount} orchestrations on a single host"); | ||
|
|
||
| using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30)); | ||
|
|
||
| Task<OrchestrationMetadata>[] waitTasks = instanceIds | ||
| .Select(id => host.Client.WaitForInstanceCompletionAsync( | ||
| id, getInputsAndOutputs: true, cts.Token)) | ||
| .ToArray(); | ||
|
|
||
| OrchestrationMetadata[] results = await Task.WhenAll(waitTasks); | ||
|
|
||
| for (int i = 0; i < orchestrationCount; i++) | ||
| { | ||
| Assert.NotNull(results[i]); | ||
| Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus); | ||
| } | ||
|
|
||
| this.output.WriteLine($"All {orchestrationCount} completed"); | ||
| } | ||
|
|
||
| /// <summary> | ||
| /// Schedules 30 ContinueAsNew orchestrations in 3 waves of 10, with 100ms delays | ||
| /// between waves. The staggered scheduling creates overlapping lifecycle phases — | ||
| /// earlier orchestrations may be mid-ContinueAsNew (waiting on timer or activity) | ||
| /// when later waves arrive, producing varied timing patterns that exercise the | ||
| /// interplay between lock release, message delivery, and queue re-scheduling. | ||
| /// </summary> | ||
| [Fact(Timeout = 60_000)] | ||
| public async Task Staggered_ContinueAsNew_AllComplete() | ||
| { | ||
| const int wavesCount = 3; | ||
| const int perWave = 10; | ||
|
|
||
| await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(RegisterTestFunctions); | ||
|
|
||
| List<string> allInstanceIds = new(); | ||
|
|
||
| for (int wave = 0; wave < wavesCount; wave++) | ||
| { | ||
| for (int i = 0; i < perWave; i++) | ||
| { | ||
| var input = new AsyncOperation { Status = Status.NotStarted, Closed = false, IterationCount = 0 }; | ||
| string id = await host.Client.ScheduleNewOrchestrationInstanceAsync("TestOrchestrator", input); | ||
| allInstanceIds.Add(id); | ||
| } | ||
|
|
||
| this.output.WriteLine($"Wave {wave + 1}/{wavesCount}: scheduled {perWave} orchestrations"); | ||
| await Task.Delay(100); | ||
| } | ||
|
|
||
| this.output.WriteLine($"Total scheduled: {allInstanceIds.Count}"); | ||
|
|
||
| using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30)); | ||
|
|
||
| Task<OrchestrationMetadata>[] waitTasks = allInstanceIds | ||
| .Select(id => host.Client.WaitForInstanceCompletionAsync( | ||
| id, getInputsAndOutputs: true, cts.Token)) | ||
| .ToArray(); | ||
|
|
||
| OrchestrationMetadata[] results = await Task.WhenAll(waitTasks); | ||
|
|
||
| for (int i = 0; i < results.Length; i++) | ||
| { | ||
| Assert.NotNull(results[i]); | ||
| Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus); | ||
| } | ||
|
|
||
| this.output.WriteLine($"All {allInstanceIds.Count} completed"); | ||
| } | ||
|
|
||
| #region Models | ||
|
|
||
| public enum Status | ||
| { | ||
| NotStarted, | ||
| InProgress, | ||
| Succeeded, | ||
| } | ||
|
|
||
| public class AsyncOperation | ||
| { | ||
| public Status Status { get; set; } | ||
|
|
||
| public bool Closed { get; set; } | ||
|
|
||
| public int IterationCount { get; set; } | ||
|
|
||
| public void Update(AsyncOperation result) | ||
| { | ||
| this.Status = result.Status; | ||
| this.IterationCount++; | ||
|
|
||
| if (this.Status == Status.Succeeded) | ||
| { | ||
| this.Closed = true; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #endregion | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing
OperationCanceledExceptionfrom gRPC server code can surface to callers as an unexpected/opaque gRPC status (oftenUnknown) unless consistently translated upstream. Since this is explicitly a client-disconnect/stream-cancel scenario, consider throwing anRpcExceptionwith an appropriate status code (e.g.,CancelledorUnavailable) so the failure mode is explicit and consistent with the earlierNo client is connectedpath.