Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/InProcessTestHost/InProcessTestHost.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<RootNamespace>Microsoft.DurableTask.Testing</RootNamespace>
<AssemblyName>Microsoft.DurableTask.InProcessTestHost</AssemblyName>
<PackageId>Microsoft.DurableTask.InProcessTestHost</PackageId>
<Version>0.2.1-preview.1</Version>
<Version>0.2.2-preview.1</Version>

<!-- Suppress CA1848: Use LoggerMessage delegates for high-performance logging scenarios -->
<NoWarn>$(NoWarn);CA1848</NoWarn>
Expand Down
26 changes: 21 additions & 5 deletions src/InProcessTestHost/Sidecar/Grpc/TaskHubGrpcServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,8 @@ async Task<GrpcOrchestratorExecutionResult> ITaskExecutor.ExecuteOrchestrator(
totalBytes += ev.CalculateSize();
}

if (this.supportsHistoryStreaming && totalBytes > (1024))
const int HistoryStreamingThresholdBytes = 1024 * 1024; // 1 MiB
if (this.supportsHistoryStreaming && totalBytes > HistoryStreamingThresholdBytes)
{
orkRequest.RequiresHistoryStreaming = true;
// Store past events to serve via StreamInstanceHistory
Expand Down Expand Up @@ -901,8 +902,7 @@ async Task SendWorkItemToClientAsync(P.WorkItem workItem)
lock (this.isConnectedSignal)
{
outputStream = this.workerToClientStream ??
// CA2201: Use specific exception types
throw new InvalidOperationException("No client is connected. Need to wait until a client connects before executing.");
throw new RpcException(new Status(StatusCode.Unavailable, "No client is connected."));
}

// The gRPC channel can only handle one message at a time, so we need to serialize access to it.
Expand All @@ -911,6 +911,22 @@ async Task SendWorkItemToClientAsync(P.WorkItem workItem)
{
await outputStream.WriteAsync(workItem);
}
catch (InvalidOperationException ex) when (ex.Message.Contains("request is complete", StringComparison.OrdinalIgnoreCase))
{
// The client disconnected or canceled the GetWorkItems stream.
// Reset the connection state so the dispatcher pauses naturally
// (via the traffic signal) until a new client connects.
lock (this.isConnectedSignal)
{
this.workerToClientStream = null;
this.isConnectedSignal.Reset();
}

// Must throw so callers (ExecuteOrchestrator/ExecuteActivity) can clean up
// their pending TCS. The dispatcher catches this, abandons the work item,
// and releases it back to the queue for retry.
throw new OperationCanceledException("Work-item stream closed by client.", ex);
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Throwing OperationCanceledException from gRPC server code can surface to callers as an unexpected/opaque gRPC status (often Unknown) unless consistently translated upstream. Since this is explicitly a client-disconnect/stream-cancel scenario, consider throwing an RpcException with an appropriate status code (e.g., Cancelled or Unavailable) so the failure mode is explicit and consistent with the earlier No client is connected path.

Suggested change
throw new OperationCanceledException("Work-item stream closed by client.", ex);
throw new RpcException(new Status(StatusCode.Cancelled, "Work-item stream closed by client."));

Copilot uses AI. Check for mistakes.
}
Comment on lines +914 to +929
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching InvalidOperationException based on a substring of the exception message is brittle (message text can change across runtime/framework versions and may vary). Prefer matching on exception types that represent stream cancellation/disconnect (commonly RpcException with Cancelled/Unavailable, or IOException/ObjectDisposedException depending on the gRPC stack), or use a more reliable signal from the server call context if available. This will make the disconnect handling more stable long-term.

Copilot uses AI. Check for mistakes.
Comment on lines +914 to +929
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catching InvalidOperationException based on a substring match of ex.Message is brittle (message text can change across framework versions/locales and is not a stable contract). Prefer handling stream-closure in a way that doesn’t depend on message text (e.g., catch InvalidOperationException without a message filter and/or handle additional common stream write failures like ObjectDisposedException/IOException, then treat them uniformly as a disconnected client).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this comment, but I will not block on this. If the message text changes, the new tests should catch this. We should look for other ways to detect this condition: perhaps the exception object has more clues in other properties, or we can infer this from other context. However, this can be addressed later -- I recommend filing a separate follow-up issue now, but no need to block releasing the fix.

finally
{
this.sendWorkItemLock.Release();
Expand All @@ -919,7 +935,7 @@ async Task SendWorkItemToClientAsync(P.WorkItem workItem)

TaskCompletionSource<GrpcOrchestratorExecutionResult> CreateTaskCompletionSourceForOrchestrator(string instanceId)
{
TaskCompletionSource<GrpcOrchestratorExecutionResult> tcs = new();
TaskCompletionSource<GrpcOrchestratorExecutionResult> tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
this.pendingOrchestratorTasks.TryAdd(instanceId, tcs);
return tcs;
}
Expand All @@ -933,7 +949,7 @@ void RemoveOrchestratorTaskCompletionSource(string instanceId)
TaskCompletionSource<ActivityExecutionResult> CreateTaskCompletionSourceForActivity(string instanceId, int taskId)
{
string taskIdKey = GetTaskIdKey(instanceId, taskId);
TaskCompletionSource<ActivityExecutionResult> tcs = new();
TaskCompletionSource<ActivityExecutionResult> tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
this.pendingActivityTasks.TryAdd(taskIdKey, tcs);
return tcs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -742,7 +742,7 @@ public Task<OrchestrationState> WaitForInstanceAsync(string instanceId, Cancella
// First, add the waiter before checking completion to avoid a race condition.
// This ensures we don't miss a completion notification that happens between
// checking the status and adding the waiter.
var tcs = this.waiters.GetOrAdd(instanceId, _ => new TaskCompletionSource<OrchestrationState>());
var tcs = this.waiters.GetOrAdd(instanceId, _ => new TaskCompletionSource<OrchestrationState>(TaskCreationOptions.RunContinuationsAsynchronously));

// Now check if already completed - if so, complete the waiter immediately
if (this.store.TryGetValue(instanceId, out SerializedInstanceState? state))
Expand Down
256 changes: 256 additions & 0 deletions test/InProcessTestHost.Tests/ContinueAsNewTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

using Microsoft.DurableTask;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Testing;
using Microsoft.DurableTask.Worker;
using Xunit;
using Xunit.Abstractions;

Comment thread
bachuv marked this conversation as resolved.
namespace InProcessTestHost.Tests;

/// <summary>
/// Tests that orchestrations using ContinueAsNew with activity calls and timers
/// resume correctly after each iteration and eventually complete.
/// </summary>
public class ContinueAsNewTests
{
readonly ITestOutputHelper output;

public ContinueAsNewTests(ITestOutputHelper output)
{
this.output = output;
}

/// <summary>
/// Registers a polling-style orchestrator and a stateful activity.
///
/// TestOrchestrator:
/// 1. Calls TestActivity with current state
/// 2. Updates state from activity result
/// 3. If not closed: waits 1s timer, then ContinueAsNew with updated state
/// 4. If closed: orchestration ends
///
/// TestActivity:
/// - First call: sets status to InProgress
/// - Second call: sets status to Succeeded (triggers orchestration completion)
/// </summary>
static void RegisterTestFunctions(DurableTaskRegistry tasks)
{
tasks.AddOrchestratorFunc<AsyncOperation>("TestOrchestrator", async (context, input) =>
{
var result = await context.CallActivityAsync<AsyncOperation>("TestActivity", input);
input.Update(result);

if (!input.Closed)
{
await context.CreateTimer(TimeSpan.FromSeconds(1), CancellationToken.None);
context.ContinueAsNew(input);
return;
}
Comment thread
bachuv marked this conversation as resolved.
});

tasks.AddActivityFunc<AsyncOperation, AsyncOperation>("TestActivity", (context, input) =>
{
if (input.Status == Status.InProgress)
{
input.Status = Status.Succeeded;
}
else
{
input.Status = Status.InProgress;
}
Comment thread
bachuv marked this conversation as resolved.
Dismissed

return input;
});
}

/// <summary>
/// Verifies that a single orchestration calling an activity, waiting on a timer,
/// and then using ContinueAsNew completes after 2 iterations without hanging.
/// Covers the basic ContinueAsNew lifecycle: activity call -> state update ->
/// timer -> ContinueAsNew -> activity call -> completion.
/// </summary>
[Fact(Timeout = 30_000)]
public async Task Orchestration_ContinueAsNew_WithActivityAndTimer_CompletesSuccessfully()
{
await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(RegisterTestFunctions);

var input = new AsyncOperation { Status = Status.NotStarted, Closed = false, IterationCount = 0 };
string instanceId = await host.Client.ScheduleNewOrchestrationInstanceAsync("TestOrchestrator", input);

using CancellationTokenSource cts = new(TimeSpan.FromSeconds(20));
OrchestrationMetadata metadata = await host.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, cts.Token);

Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
}

/// <summary>
/// Runs 10 ContinueAsNew orchestrations in parallel, repeated across 3 rounds
/// with a fresh host each round. Validates that activity completion messages
/// are correctly delivered under contention when many orchestrations compete
/// for the dispatcher, ready-to-run queue, and instance locks simultaneously.
/// Total: 30 orchestration instances across 3 rounds.
/// </summary>
[Fact(Timeout = 60_000)]
public async Task ConcurrentOrchestrations_ContinueAsNew_AllComplete()
{
const int orchestrationCount = 10;
const int rounds = 3;

for (int round = 0; round < rounds; round++)
{
this.output.WriteLine($"=== Round {round + 1}/{rounds} ===");

await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(RegisterTestFunctions);

string[] instanceIds = new string[orchestrationCount];
for (int i = 0; i < orchestrationCount; i++)
{
var input = new AsyncOperation { Status = Status.NotStarted, Closed = false, IterationCount = 0 };
instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync("TestOrchestrator", input);
}

using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30));

Task<OrchestrationMetadata>[] waitTasks = instanceIds
.Select(id => host.Client.WaitForInstanceCompletionAsync(
id, getInputsAndOutputs: true, cts.Token))
.ToArray();

OrchestrationMetadata[] results = await Task.WhenAll(waitTasks);

for (int i = 0; i < orchestrationCount; i++)
{
Assert.NotNull(results[i]);
Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus);
}

this.output.WriteLine($"Round {round + 1}: all {orchestrationCount} completed");
}
}

/// <summary>
/// Schedules 20 ContinueAsNew orchestrations on a single host as fast as possible.
/// All 20 share the same dispatcher and ready-to-run queue for their entire lifecycle,
/// maximizing interleaving of activity completion messages and ContinueAsNew
/// re-schedules across instances.
/// </summary>
[Fact(Timeout = 60_000)]
public async Task RapidFire_SequentialScheduling_ContinueAsNew_AllComplete()
{
const int orchestrationCount = 20;

await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(RegisterTestFunctions);

string[] instanceIds = new string[orchestrationCount];
for (int i = 0; i < orchestrationCount; i++)
{
var input = new AsyncOperation { Status = Status.NotStarted, Closed = false, IterationCount = 0 };
instanceIds[i] = await host.Client.ScheduleNewOrchestrationInstanceAsync("TestOrchestrator", input);
}

this.output.WriteLine($"Scheduled {orchestrationCount} orchestrations on a single host");

using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30));

Task<OrchestrationMetadata>[] waitTasks = instanceIds
.Select(id => host.Client.WaitForInstanceCompletionAsync(
id, getInputsAndOutputs: true, cts.Token))
.ToArray();

OrchestrationMetadata[] results = await Task.WhenAll(waitTasks);

for (int i = 0; i < orchestrationCount; i++)
{
Assert.NotNull(results[i]);
Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus);
}

this.output.WriteLine($"All {orchestrationCount} completed");
}

/// <summary>
/// Schedules 30 ContinueAsNew orchestrations in 3 waves of 10, with 100ms delays
/// between waves. The staggered scheduling creates overlapping lifecycle phases —
/// earlier orchestrations may be mid-ContinueAsNew (waiting on timer or activity)
/// when later waves arrive, producing varied timing patterns that exercise the
/// interplay between lock release, message delivery, and queue re-scheduling.
/// </summary>
[Fact(Timeout = 60_000)]
public async Task Staggered_ContinueAsNew_AllComplete()
{
const int wavesCount = 3;
const int perWave = 10;

await using DurableTaskTestHost host = await DurableTaskTestHost.StartAsync(RegisterTestFunctions);

List<string> allInstanceIds = new();

for (int wave = 0; wave < wavesCount; wave++)
{
for (int i = 0; i < perWave; i++)
{
var input = new AsyncOperation { Status = Status.NotStarted, Closed = false, IterationCount = 0 };
string id = await host.Client.ScheduleNewOrchestrationInstanceAsync("TestOrchestrator", input);
allInstanceIds.Add(id);
}

this.output.WriteLine($"Wave {wave + 1}/{wavesCount}: scheduled {perWave} orchestrations");
await Task.Delay(100);
}

this.output.WriteLine($"Total scheduled: {allInstanceIds.Count}");

using CancellationTokenSource cts = new(TimeSpan.FromSeconds(30));

Task<OrchestrationMetadata>[] waitTasks = allInstanceIds
.Select(id => host.Client.WaitForInstanceCompletionAsync(
id, getInputsAndOutputs: true, cts.Token))
.ToArray();

OrchestrationMetadata[] results = await Task.WhenAll(waitTasks);

for (int i = 0; i < results.Length; i++)
{
Assert.NotNull(results[i]);
Assert.Equal(OrchestrationRuntimeStatus.Completed, results[i].RuntimeStatus);
}

this.output.WriteLine($"All {allInstanceIds.Count} completed");
}

#region Models

public enum Status
{
NotStarted,
InProgress,
Succeeded,
}

public class AsyncOperation
{
public Status Status { get; set; }

public bool Closed { get; set; }

public int IterationCount { get; set; }

public void Update(AsyncOperation result)
{
this.Status = result.Status;
this.IterationCount++;

if (this.Status == Status.Succeeded)
{
this.Closed = true;
}
}
}

#endregion
}
Loading