diff --git a/dotnet/agent-framework-dotnet.slnx b/dotnet/agent-framework-dotnet.slnx index 04fbb6cd87..3cfafb9a49 100644 --- a/dotnet/agent-framework-dotnet.slnx +++ b/dotnet/agent-framework-dotnet.slnx @@ -57,6 +57,7 @@ + diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Agent_Step19_InFunctionLoopCheckpointing.csproj b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Agent_Step19_InFunctionLoopCheckpointing.csproj new file mode 100644 index 0000000000..41aafe3437 --- /dev/null +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Agent_Step19_InFunctionLoopCheckpointing.csproj @@ -0,0 +1,20 @@ + + + + Exe + net10.0 + + enable + enable + + + + + + + + + + + + diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs new file mode 100644 index 0000000000..3b886a55e6 --- /dev/null +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/Program.cs @@ -0,0 +1,224 @@ +// Copyright (c) Microsoft. All rights reserved. + +// This sample demonstrates how the PersistChatHistoryAfterEachServiceCall option causes +// chat history to be persisted after each individual call to the AI service, rather than +// only at the end of the full agent run. When an agent uses tools, FunctionInvokingChatClient +// loops multiple times (service call → tool execution → service call), and by default the +// chat history is only persisted once the entire loop finishes. With this option enabled, +// intermediate messages (tool calls and results) are persisted after each service call, +// allowing you to inspect or recover them even if the process is interrupted mid-loop. +// +// The sample runs two multi-turn conversations: one using non-streaming (RunAsync) and one +// using streaming (RunStreamingAsync), to demonstrate correct behavior in both modes. + +using System.ComponentModel; +using Azure.AI.OpenAI; +using Azure.Identity; +using Microsoft.Agents.AI; +using Microsoft.Extensions.AI; +using OpenAI.Responses; + +var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set."); +var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini"; +var store = Environment.GetEnvironmentVariable("AZURE_OPENAI_RESPONSES_STORE") ?? "false"; + +// WARNING: DefaultAzureCredential is convenient for development but requires careful consideration in production. +// In production, consider using a specific credential (e.g., ManagedIdentityCredential) to avoid +// latency issues, unintended credential probing, and potential security risks from fallback mechanisms. +AzureOpenAIClient openAIClient = new(new Uri(endpoint), new DefaultAzureCredential()); + +// Define multiple tools so the model makes several tool calls in a single run. +[Description("Get the current weather for a city.")] +static string GetWeather([Description("The city name.")] string city) => + city.ToUpperInvariant() switch + { + "SEATTLE" => "Seattle: 55°F, cloudy with light rain.", + "NEW YORK" => "New York: 72°F, sunny and warm.", + "LONDON" => "London: 48°F, overcast with fog.", + "DUBLIN" => "Dublin: 43°F, overcast with fog.", + _ => $"{city}: weather data not available." + }; + +[Description("Get the current time in a city.")] +static string GetTime([Description("The city name.")] string city) => + city.ToUpperInvariant() switch + { + "SEATTLE" => "Seattle: 9:00 AM PST", + "NEW YORK" => "New York: 12:00 PM EST", + "LONDON" => "London: 5:00 PM GMT", + "DUBLIN" => "Dublin: 5:00 PM GMT", + _ => $"{city}: time data not available." + }; + +// Create the agent with PersistChatHistoryAfterEachServiceCall enabled. +// The in-memory ChatHistoryProvider is used by default when the service does not require service stored chat +// history, so for those cases, we can inspect the chat history via session.TryGetInMemoryChatHistory(). +IChatClient chatClient = string.Equals(store, "TRUE", StringComparison.OrdinalIgnoreCase) ? + openAIClient.GetResponsesClient(deploymentName).AsIChatClient() : + openAIClient.GetResponsesClient(deploymentName).AsIChatClientWithStoredOutputDisabled(); +AIAgent agent = chatClient.AsAIAgent( + new ChatClientAgentOptions + { + Name = "WeatherAssistant", + ChatOptions = new() + { + Instructions = "You are a helpful assistant. When asked about multiple cities, call the appropriate tool for each city.", + Tools = [AIFunctionFactory.Create(GetWeather), AIFunctionFactory.Create(GetTime)] + }, + PersistChatHistoryAfterEachServiceCall = true, + }); + +await RunNonStreamingAsync(); +await RunStreamingAsync(); + +async Task RunNonStreamingAsync() +{ + int lastChatHistorySize = 0; + string lastConversationId = string.Empty; + + Console.ForegroundColor = ConsoleColor.Yellow; + Console.WriteLine("\n=== Non-Streaming Mode ==="); + Console.ResetColor(); + + AgentSession session = await agent.CreateSessionAsync(); + + // First turn — ask about multiple cities so the model calls tools. + const string Prompt = "What's the weather and time in Seattle, New York, and London?"; + PrintUserMessage(Prompt); + + var response = await agent.RunAsync(Prompt, session); + PrintAgentResponse(response.Text); + PrintChatHistory(session, "After run", ref lastChatHistorySize, ref lastConversationId); + + // Second turn — follow-up to verify chat history is correct. + const string FollowUp1 = "And Dublin?"; + PrintUserMessage(FollowUp1); + + response = await agent.RunAsync(FollowUp1, session); + PrintAgentResponse(response.Text); + PrintChatHistory(session, "After second run", ref lastChatHistorySize, ref lastConversationId); + + // Third turn — follow-up to verify chat history is correct. + const string FollowUp2 = "Which city is the warmest?"; + PrintUserMessage(FollowUp2); + + response = await agent.RunAsync(FollowUp2, session); + PrintAgentResponse(response.Text); + PrintChatHistory(session, "After third run", ref lastChatHistorySize, ref lastConversationId); +} + +async Task RunStreamingAsync() +{ + int lastChatHistorySize = 0; + string lastConversationId = string.Empty; + + Console.ForegroundColor = ConsoleColor.Yellow; + Console.WriteLine("\n=== Streaming Mode ==="); + Console.ResetColor(); + + AgentSession session = await agent.CreateSessionAsync(); + + // First turn — ask about multiple cities so the model calls tools. + const string Prompt = "What's the weather and time in Seattle, New York, and London?"; + PrintUserMessage(Prompt); + + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[Agent] "); + Console.ResetColor(); + + await foreach (var update in agent.RunStreamingAsync(Prompt, session)) + { + Console.Write(update); + + // During streaming we should be able to see updates to the chat history + // before the full run completes, as each service call is made and persisted. + PrintChatHistory(session, "During run", ref lastChatHistorySize, ref lastConversationId); + } + + Console.WriteLine(); + PrintChatHistory(session, "After run", ref lastChatHistorySize, ref lastConversationId); + + // Second turn — follow-up to verify chat history is correct. + const string FollowUp1 = "And Dublin?"; + PrintUserMessage(FollowUp1); + + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[Agent] "); + Console.ResetColor(); + + await foreach (var update in agent.RunStreamingAsync(FollowUp1, session)) + { + Console.Write(update); + + // During streaming we should be able to see updates to the chat history + // before the full run completes, as each service call is made and persisted. + PrintChatHistory(session, "During second run", ref lastChatHistorySize, ref lastConversationId); + } + + Console.WriteLine(); + PrintChatHistory(session, "After second run", ref lastChatHistorySize, ref lastConversationId); + + // Third turn — follow-up to verify chat history is correct. + const string FollowUp2 = "Which city is the warmest?"; + PrintUserMessage(FollowUp2); + + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[Agent] "); + Console.ResetColor(); + + await foreach (var update in agent.RunStreamingAsync(FollowUp2, session)) + { + Console.Write(update); + + // During streaming we should be able to see updates to the chat history + // before the full run completes, as each service call is made and persisted. + PrintChatHistory(session, "During third run", ref lastChatHistorySize, ref lastConversationId); + } + + Console.WriteLine(); + PrintChatHistory(session, "After third run", ref lastChatHistorySize, ref lastConversationId); +} + +void PrintUserMessage(string message) +{ + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[User] "); + Console.ResetColor(); + Console.WriteLine(message); +} + +void PrintAgentResponse(string? text) +{ + Console.ForegroundColor = ConsoleColor.Cyan; + Console.Write("\n[Agent] "); + Console.ResetColor(); + Console.WriteLine(text); +} + +// Helper to print the current chat history from the session. +void PrintChatHistory(AgentSession session, string label, ref int lastChatHistorySize, ref string lastConversationId) +{ + if (session.TryGetInMemoryChatHistory(out var history) && history.Count != lastChatHistorySize) + { + Console.ForegroundColor = ConsoleColor.DarkGray; + Console.WriteLine($"\n [{label} — Chat history: {history.Count} message(s)]"); + foreach (var msg in history) + { + var preview = msg.Text?.Length > 80 ? msg.Text[..80] + "…" : msg.Text; + var contentTypes = string.Join(", ", msg.Contents.Select(c => c.GetType().Name)); + Console.WriteLine($" {msg.Role,-12} | {(string.IsNullOrWhiteSpace(preview) ? $"[{contentTypes}]" : preview)}"); + } + + Console.ResetColor(); + + lastChatHistorySize = history.Count; + } + + if (session is ChatClientAgentSession ccaSession && ccaSession.ConversationId is not null && ccaSession.ConversationId != lastConversationId) + { + Console.ForegroundColor = ConsoleColor.DarkGray; + Console.WriteLine($" [{label} — Conversation ID: {ccaSession.ConversationId}]"); + Console.ResetColor(); + lastConversationId = ccaSession.ConversationId; + } +} diff --git a/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md new file mode 100644 index 0000000000..3c3e8a2c30 --- /dev/null +++ b/dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing/README.md @@ -0,0 +1,62 @@ +# In-Function-Loop Checkpointing + +This sample demonstrates how the `PersistChatHistoryAfterEachServiceCall` option on `ChatClientAgentOptions` causes chat history to be saved after each individual call to the AI service, rather than only at the end of the full agent run. + +## What This Sample Shows + +When an agent uses tools, the `FunctionInvokingChatClient` loops multiple times (service call → tool execution → service call → …). By default, chat history is only persisted once the entire loop finishes. With `PersistChatHistoryAfterEachServiceCall` enabled: + +- A `ChatHistoryPersistingChatClient` decorator is automatically inserted into the chat client pipeline +- After each service call, the decorator notifies the `ChatHistoryProvider` (and any `AIContextProvider` instances) with the new messages +- Only **new** messages are sent to providers on each notification — messages that were already persisted in an earlier call within the same run are deduplicated automatically +- The end-of-run persistence in `ChatClientAgent` is skipped to avoid double-persisting + +This is useful for: +- **Crash recovery** — if the process is interrupted mid-loop, the intermediate tool calls and results are already persisted +- **Observability** — you can inspect the chat history while the agent is still running (e.g., during streaming) +- **Long-running tool loops** — agents with many sequential tool calls benefit from incremental persistence + +## How It Works + +The sample asks the agent about the weather and time in three cities. The model calls the `GetWeather` and `GetTime` tools for each city, resulting in multiple service calls within a single `RunStreamingAsync` invocation. After the run completes, the sample prints the full chat history to show all the intermediate messages that were persisted along the way. + +### Pipeline Architecture + +``` +ChatClientAgent + └─ FunctionInvokingChatClient (handles tool call loop) + └─ ChatHistoryPersistingChatClient (persists after each service call) + └─ Leaf IChatClient (Azure OpenAI) +``` + +## Prerequisites + +- .NET 10 SDK or later +- Azure OpenAI service endpoint and model deployment +- Azure CLI installed and authenticated + +**Note**: This sample uses `DefaultAzureCredential`. Sign in with `az login` before running. For production, prefer a specific credential such as `ManagedIdentityCredential`. For more information, see the [Azure CLI authentication documentation](https://learn.microsoft.com/cli/azure/authenticate-azure-cli-interactively). + +## Environment Variables + +```powershell +$env:AZURE_OPENAI_ENDPOINT="https://your-resource.openai.azure.com/" # Required +$env:AZURE_OPENAI_DEPLOYMENT_NAME="gpt-4o-mini" # Optional, defaults to gpt-4o-mini +``` + +## Running the Sample + +```powershell +cd dotnet/samples/02-agents/Agents/Agent_Step19_InFunctionLoopCheckpointing +dotnet run +``` + +## Expected Behavior + +The sample runs two conversation turns: + +1. **First turn** — asks about weather and time in three cities. The model calls `GetWeather` and `GetTime` tools (potentially in parallel or sequentially), then provides a summary. The chat history dump after the run shows all the intermediate tool call and result messages. + +2. **Second turn** — asks a follow-up question ("Which city is the warmest?") that uses the persisted conversation context. The chat history dump shows the full accumulated conversation. + +The chat history printout uses `session.TryGetInMemoryChatHistory()` to inspect the in-memory storage. diff --git a/dotnet/samples/02-agents/Agents/README.md b/dotnet/samples/02-agents/Agents/README.md index 4ac53ba246..c5258ba9f4 100644 --- a/dotnet/samples/02-agents/Agents/README.md +++ b/dotnet/samples/02-agents/Agents/README.md @@ -45,6 +45,7 @@ Before you begin, ensure you have the following prerequisites: |[Declarative agent](./Agent_Step16_Declarative/)|This sample demonstrates how to declaratively define an agent.| |[Providing additional AI Context to an agent using multiple AIContextProviders](./Agent_Step17_AdditionalAIContext/)|This sample demonstrates how to inject additional AI context into a ChatClientAgent using multiple custom AIContextProvider components that are attached to the agent.| |[Using compaction pipeline with an agent](./Agent_Step18_CompactionPipeline/)|This sample demonstrates how to use a compaction pipeline to efficiently limit the size of the conversation history for an agent.| +|[In-function-loop checkpointing](./Agent_Step19_InFunctionLoopCheckpointing/)|This sample demonstrates how to persist chat history after each service call during a tool-calling loop, enabling crash recovery and mid-run observability.| ## Running the samples from the console diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs index adb6eb9f83..eb37e853c3 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgent.cs @@ -212,11 +212,9 @@ protected override async Task RunCoreAsync( await this.PrepareSessionAndMessagesAsync(session, inputMessages, options, cancellationToken).ConfigureAwait(false); var chatClient = this.ChatClient; - chatClient = ApplyRunOptionsTransformations(options, chatClient); var loggingAgentName = this.GetLoggingAgentName(); - this._logger.LogAgentChatClientInvokingAgent(nameof(RunAsync), this.Id, loggingAgentName, this._chatClientType); // Call the IChatClient and notify the AIContextProvider of any failures. @@ -227,8 +225,7 @@ protected override async Task RunCoreAsync( } catch (Exception ex) { - await this.NotifyChatHistoryProviderOfFailureAsync(safeSession, ex, inputMessagesForChatClient, chatOptions, cancellationToken).ConfigureAwait(false); - await this.NotifyAIContextProviderOfFailureAsync(safeSession, ex, inputMessagesForChatClient, cancellationToken).ConfigureAwait(false); + await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, inputMessagesForChatClient, chatOptions, cancellationToken).ConfigureAwait(false); throw; } @@ -236,7 +233,7 @@ protected override async Task RunCoreAsync( // We can derive the type of supported session from whether we have a conversation id, // so let's update it and set the conversation id for the service session case. - this.UpdateSessionConversationId(safeSession, chatResponse.ConversationId, cancellationToken); + this.UpdateSessionConversationIdAtEndOfRun(safeSession, chatResponse.ConversationId, cancellationToken); // Ensure that the author name is set for each message in the response. foreach (ChatMessage chatResponseMessage in chatResponse.Messages) @@ -244,11 +241,8 @@ protected override async Task RunCoreAsync( chatResponseMessage.AuthorName ??= this.Name; } - // Only notify the session of new messages if the chatResponse was successful to avoid inconsistent message state in the session. - await this.NotifyChatHistoryProviderOfNewMessagesAsync(safeSession, inputMessagesForChatClient, chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); - - // Notify the AIContextProvider of all new messages. - await this.NotifyAIContextProviderOfSuccessAsync(safeSession, inputMessagesForChatClient, chatResponse.Messages, cancellationToken).ConfigureAwait(false); + // Notify providers of all new messages unless persistence is handled per-service-call by the decorator. + await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, inputMessagesForChatClient, chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); return new AgentResponse(chatResponse) { @@ -315,8 +309,7 @@ protected override async IAsyncEnumerable RunCoreStreamingA } catch (Exception ex) { - await this.NotifyChatHistoryProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); - await this.NotifyAIContextProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), cancellationToken).ConfigureAwait(false); + await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); throw; } @@ -330,8 +323,7 @@ protected override async IAsyncEnumerable RunCoreStreamingA } catch (Exception ex) { - await this.NotifyChatHistoryProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); - await this.NotifyAIContextProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), cancellationToken).ConfigureAwait(false); + await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); throw; } @@ -357,8 +349,7 @@ protected override async IAsyncEnumerable RunCoreStreamingA } catch (Exception ex) { - await this.NotifyChatHistoryProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); - await this.NotifyAIContextProviderOfFailureAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), cancellationToken).ConfigureAwait(false); + await this.NotifyProvidersOfFailureAtEndOfRunAsync(safeSession, ex, GetInputMessages(inputMessagesForChatClient, continuationToken), chatOptions, cancellationToken).ConfigureAwait(false); throw; } } @@ -367,13 +358,10 @@ protected override async IAsyncEnumerable RunCoreStreamingA // We can derive the type of supported session from whether we have a conversation id, // so let's update it and set the conversation id for the service session case. - this.UpdateSessionConversationId(safeSession, chatResponse.ConversationId, cancellationToken); - - // To avoid inconsistent state we only notify the session of the input messages if no error occurs after the initial request. - await this.NotifyChatHistoryProviderOfNewMessagesAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); + this.UpdateSessionConversationIdAtEndOfRun(safeSession, chatResponse.ConversationId, cancellationToken); - // Notify the AIContextProvider of all new messages. - await this.NotifyAIContextProviderOfSuccessAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, cancellationToken).ConfigureAwait(false); + // Notify providers of all new messages unless persistence is handled per-service-call by the decorator. + await this.NotifyProvidersOfNewMessagesAtEndOfRunAsync(safeSession, GetInputMessages(inputMessagesForChatClient, continuationToken), chatResponse.Messages, chatOptions, cancellationToken).ConfigureAwait(false); } /// @@ -441,17 +429,29 @@ protected override ValueTask DeserializeSessionCoreAsync(JsonEleme #region Private /// - /// Notify the when an agent run succeeded, if there is an . + /// Notifies the and all of successfully completed messages. /// - private async Task NotifyAIContextProviderOfSuccessAsync( + /// + /// This method is also called by to persist messages per-service-call. + /// + internal async Task NotifyProvidersOfNewMessagesAsync( ChatClientAgentSession session, - IEnumerable inputMessages, + IEnumerable requestMessages, IEnumerable responseMessages, + ChatOptions? chatOptions, CancellationToken cancellationToken) { + ChatHistoryProvider? chatHistoryProvider = this.ResolveChatHistoryProvider(chatOptions, session); + + if (chatHistoryProvider is not null) + { + var invokedContext = new ChatHistoryProvider.InvokedContext(this, session, requestMessages, responseMessages); + await chatHistoryProvider.InvokedAsync(invokedContext, cancellationToken).ConfigureAwait(false); + } + if (this.AIContextProviders is { Count: > 0 } contextProviders) { - AIContextProvider.InvokedContext invokedContext = new(this, session, inputMessages, responseMessages); + AIContextProvider.InvokedContext invokedContext = new(this, session, requestMessages, responseMessages); foreach (var contextProvider in contextProviders) { @@ -461,17 +461,29 @@ private async Task NotifyAIContextProviderOfSuccessAsync( } /// - /// Notify the of any failure during an agent run, if there is an . + /// Notifies the and all of a failure during a service call. /// - private async Task NotifyAIContextProviderOfFailureAsync( + /// + /// This method is also called by to report failures per-service-call. + /// + internal async Task NotifyProvidersOfFailureAsync( ChatClientAgentSession session, Exception ex, - IEnumerable inputMessages, + IEnumerable requestMessages, + ChatOptions? chatOptions, CancellationToken cancellationToken) { + ChatHistoryProvider? chatHistoryProvider = this.ResolveChatHistoryProvider(chatOptions, session); + + if (chatHistoryProvider is not null) + { + var invokedContext = new ChatHistoryProvider.InvokedContext(this, session, requestMessages, ex); + await chatHistoryProvider.InvokedAsync(invokedContext, cancellationToken).ConfigureAwait(false); + } + if (this.AIContextProviders is { Count: > 0 } contextProviders) { - AIContextProvider.InvokedContext invokedContext = new(this, session, inputMessages, ex); + AIContextProvider.InvokedContext invokedContext = new(this, session, requestMessages, ex); foreach (var contextProvider in contextProviders) { @@ -754,7 +766,7 @@ private async Task return (typedSession, chatOptions, messagesList, continuationToken); } - private void UpdateSessionConversationId(ChatClientAgentSession session, string? responseConversationId, CancellationToken cancellationToken) + internal void UpdateSessionConversationId(ChatClientAgentSession session, string? responseConversationId, CancellationToken cancellationToken) { if (string.IsNullOrWhiteSpace(responseConversationId) && !string.IsNullOrWhiteSpace(session.ConversationId)) { @@ -798,47 +810,77 @@ private void UpdateSessionConversationId(ChatClientAgentSession session, string? } } - private Task NotifyChatHistoryProviderOfFailureAsync( + /// + /// Notifies providers of successfully completed messages at the end of an agent run. + /// + /// + /// When is , the + /// decorator handles per-service-call notification, + /// so this end-of-run notification is skipped. + /// + /// + /// Updates the session conversation ID at the end of an agent run. + /// + /// + /// When is , the + /// decorator handles per-service-call conversation ID updates, + /// so this end-of-run update is skipped. + /// + private void UpdateSessionConversationIdAtEndOfRun(ChatClientAgentSession session, string? responseConversationId, CancellationToken cancellationToken) + { + if (this.PersistsChatHistoryPerServiceCall) + { + return; + } + + this.UpdateSessionConversationId(session, responseConversationId, cancellationToken); + } + + private Task NotifyProvidersOfNewMessagesAtEndOfRunAsync( ChatClientAgentSession session, - Exception ex, IEnumerable requestMessages, + IEnumerable responseMessages, ChatOptions? chatOptions, CancellationToken cancellationToken) { - ChatHistoryProvider? provider = this.ResolveChatHistoryProvider(chatOptions, session); - - // Only notify the provider if we have one. - // If we don't have one, it means that the chat history is service managed and the underlying service is responsible for storing messages. - if (provider is not null) + if (this.PersistsChatHistoryPerServiceCall) { - var invokedContext = new ChatHistoryProvider.InvokedContext(this, session, requestMessages, ex); - - return provider.InvokedAsync(invokedContext, cancellationToken).AsTask(); + return Task.CompletedTask; } - return Task.CompletedTask; + return this.NotifyProvidersOfNewMessagesAsync(session, requestMessages, responseMessages, chatOptions, cancellationToken); } - private Task NotifyChatHistoryProviderOfNewMessagesAsync( + /// + /// Notifies providers of a failure at the end of an agent run. + /// + /// + /// When is , the + /// decorator handles per-service-call notification, + /// so this end-of-run notification is skipped. + /// + private Task NotifyProvidersOfFailureAtEndOfRunAsync( ChatClientAgentSession session, + Exception ex, IEnumerable requestMessages, - IEnumerable responseMessages, ChatOptions? chatOptions, CancellationToken cancellationToken) { - ChatHistoryProvider? provider = this.ResolveChatHistoryProvider(chatOptions, session); - - // Only notify the provider if we have one. - // If we don't have one, it means that the chat history is service managed and the underlying service is responsible for storing messages. - if (provider is not null) + if (this.PersistsChatHistoryPerServiceCall) { - var invokedContext = new ChatHistoryProvider.InvokedContext(this, session, requestMessages, responseMessages); - return provider.InvokedAsync(invokedContext, cancellationToken).AsTask(); + return Task.CompletedTask; } - return Task.CompletedTask; + return this.NotifyProvidersOfFailureAsync(session, ex, requestMessages, chatOptions, cancellationToken); } + /// + /// Gets a value indicating whether the agent is configured to persist chat history after each individual service call + /// via a decorator. + /// + private bool PersistsChatHistoryPerServiceCall => + this._agentOptions?.PersistChatHistoryAfterEachServiceCall is true && this._agentOptions?.UseProvidedChatClientAsIs is not true; + private ChatHistoryProvider? ResolveChatHistoryProvider(ChatOptions? chatOptions, ChatClientAgentSession session) { ChatHistoryProvider? provider = session.ConversationId is null ? this.ChatHistoryProvider : null; diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs index 38cad40bbe..d4e5fe58c0 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientAgentOptions.cs @@ -1,7 +1,9 @@ // Copyright (c) Microsoft. All rights reserved. using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using Microsoft.Extensions.AI; +using Microsoft.Shared.DiagnosticIds; namespace Microsoft.Agents.AI; @@ -89,6 +91,51 @@ public sealed class ChatClientAgentOptions /// public bool ThrowOnChatHistoryProviderConflict { get; set; } = true; + /// + /// Gets or sets a value indicating whether to persist chat history after each individual service call + /// rather than only at the end of the full agent run. + /// + /// + /// + /// By default, persists request and response messages via the + /// or persists the latest service provided + /// conversation id only after the full run completes, which may include multiple + /// iterations of the function invocation loop. Setting this property to causes + /// messages or conversation ids to be persisted after each individual call to the underlying AI service, so that intermediate + /// progress (e.g., tool calls and results) is saved even if the process is interrupted mid-loop. + /// + /// + /// Note that when using an AI service with built in chat history storage, which uses a single threaded conversation model (e.g. OpenAI Responses with the Conversations API) + /// setting this setting to will have no effect. This type of service updates the single conversation with each service call, + /// and there is no way to revert to a previous state. + /// + /// + /// On the other hand, when using an AI service with built in chat history storage, which supports forking, (e.g. OpenAI Responses with Response Ids) + /// setting this setting to will mean that the will only persist the last returned response id at + /// the end of the run, whereas setting this setting to will mean that the will persist each returned + /// response id after each service call. This means that the last successful response id will always be available in the . + /// + /// + /// It's important to note that enabling this setting may leave your chat history in a state where is required to start a new run. + /// If the last successful service call returned it is not possible to continue the session until a + /// is provided as input for a subsequent run. + /// + /// + /// When this option is enabled, a decorator is automatically + /// inserted into the chat client pipeline between the and the + /// leaf , and the will not perform its own + /// end-of-run chat history persistence to avoid double-persisting messages. + /// + /// + /// This option has no effect when is . + /// + /// + /// + /// Default is . + /// + [Experimental(DiagnosticIds.Experiments.AgentsAIExperiments)] + public bool PersistChatHistoryAfterEachServiceCall { get; set; } + /// /// Creates a new instance of with the same values as this instance. /// @@ -105,5 +152,6 @@ public ChatClientAgentOptions Clone() ClearOnChatHistoryProviderConflict = this.ClearOnChatHistoryProviderConflict, WarnOnChatHistoryProviderConflict = this.WarnOnChatHistoryProviderConflict, ThrowOnChatHistoryProviderConflict = this.ThrowOnChatHistoryProviderConflict, + PersistChatHistoryAfterEachServiceCall = this.PersistChatHistoryAfterEachServiceCall, }; } diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs index 8290c39974..03c283f74e 100644 --- a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatClientExtensions.cs @@ -63,6 +63,17 @@ internal static IChatClient WithDefaultAgentMiddleware(this IChatClient chatClie }); } + // ChatHistoryPersistingChatClient is registered after FunctionInvokingChatClient so that it sits + // between FIC and the leaf client. ChatClientBuilder.Build applies factories in reverse order, + // making the first Use() call outermost. By adding our decorator second, the resulting pipeline is: + // FunctionInvokingChatClient → ChatHistoryPersistingChatClient → leaf IChatClient + // This allows the decorator to persist messages after each individual service call within + // FIC's function invocation loop. + if (options?.PersistChatHistoryAfterEachServiceCall is true) + { + chatBuilder.Use(innerClient => new ChatHistoryPersistingChatClient(innerClient)); + } + var agentChatClient = chatBuilder.Build(services); if (options?.ChatOptions?.Tools is { Count: > 0 }) diff --git a/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs new file mode 100644 index 0000000000..0bff151710 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI/ChatClient/ChatHistoryPersistingChatClient.cs @@ -0,0 +1,240 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI; + +/// +/// A delegating chat client that notifies and +/// instances of request and response messages after each individual call to the inner chat client. +/// +/// +/// +/// This decorator is intended to operate between the and the leaf +/// in a pipeline. It ensures that providers are notified +/// after each service call rather than only at the end of the full agent run, so that intermediate messages +/// (e.g., tool calls and results) are saved even if the process is interrupted mid-loop. +/// +/// +/// This chat client must be used within the context of a running . It retrieves the +/// current agent and session from , which is set automatically when an agent's +/// or +/// +/// method is called. An is thrown if no run context is available or if the +/// agent is not a . +/// +/// +internal sealed class ChatHistoryPersistingChatClient : DelegatingChatClient +{ + /// + /// The key used in and + /// to mark messages and their content as already persisted to chat history. + /// + internal const string PersistedMarkerKey = "_chatHistoryPersisted"; + + /// + /// Initializes a new instance of the class. + /// + /// The underlying chat client that will handle the core operations. + public ChatHistoryPersistingChatClient(IChatClient innerClient) + : base(innerClient) + { + } + + /// + public override async Task GetResponseAsync( + IEnumerable messages, + ChatOptions? options = null, + CancellationToken cancellationToken = default) + { + var (agent, session) = GetRequiredAgentAndSession(); + + ChatResponse response; + try + { + response = await base.GetResponseAsync(messages, options, cancellationToken).ConfigureAwait(false); + } + catch (Exception ex) + { + var newRequestMessagesOnFailure = GetNewRequestMessages(messages); + await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); + throw; + } + + var newRequestMessages = GetNewRequestMessages(messages); + agent.UpdateSessionConversationId(session, response.ConversationId, cancellationToken); + await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, response.Messages, options, cancellationToken).ConfigureAwait(false); + MarkAsPersisted(newRequestMessages); + MarkAsPersisted(response.Messages); + + return response; + } + + /// + public override async IAsyncEnumerable GetStreamingResponseAsync( + IEnumerable messages, + ChatOptions? options = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + var (agent, session) = GetRequiredAgentAndSession(); + + List responseUpdates = []; + + IAsyncEnumerator enumerator; + try + { + enumerator = base.GetStreamingResponseAsync(messages, options, cancellationToken).GetAsyncEnumerator(cancellationToken); + } + catch (Exception ex) + { + var newRequestMessagesOnFailure = GetNewRequestMessages(messages); + await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); + throw; + } + + bool hasUpdates; + try + { + hasUpdates = await enumerator.MoveNextAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + var newRequestMessagesOnFailure = GetNewRequestMessages(messages); + await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); + throw; + } + + while (hasUpdates) + { + var update = enumerator.Current; + responseUpdates.Add(update); + yield return update; + + try + { + hasUpdates = await enumerator.MoveNextAsync().ConfigureAwait(false); + } + catch (Exception ex) + { + var newRequestMessagesOnFailure = GetNewRequestMessages(messages); + await agent.NotifyProvidersOfFailureAsync(session, ex, newRequestMessagesOnFailure, options, cancellationToken).ConfigureAwait(false); + throw; + } + } + + var chatResponse = responseUpdates.ToChatResponse(); + var newRequestMessages = GetNewRequestMessages(messages); + agent.UpdateSessionConversationId(session, chatResponse.ConversationId, cancellationToken); + await agent.NotifyProvidersOfNewMessagesAsync(session, newRequestMessages, chatResponse.Messages, options, cancellationToken).ConfigureAwait(false); + MarkAsPersisted(newRequestMessages); + MarkAsPersisted(chatResponse.Messages); + } + + /// + /// Gets the current and from the run context. + /// + private static (ChatClientAgent Agent, ChatClientAgentSession Session) GetRequiredAgentAndSession() + { + var runContext = AIAgent.CurrentRunContext + ?? throw new InvalidOperationException( + $"{nameof(ChatHistoryPersistingChatClient)} can only be used within the context of a running AIAgent. " + + "Ensure that the chat client is being invoked as part of an AIAgent.RunAsync or AIAgent.RunStreamingAsync call."); + + if (runContext.Agent is not ChatClientAgent chatClientAgent) + { + throw new InvalidOperationException( + $"{nameof(ChatHistoryPersistingChatClient)} can only be used with a {nameof(ChatClientAgent)}. " + + $"The current agent is of type '{runContext.Agent.GetType().Name}'."); + } + + if (runContext.Session is not ChatClientAgentSession chatClientAgentSession) + { + throw new InvalidOperationException( + $"{nameof(ChatHistoryPersistingChatClient)} requires a {nameof(ChatClientAgentSession)}. " + + $"The current session is of type '{runContext.Session?.GetType().Name ?? "null"}'."); + } + + return (chatClientAgent, chatClientAgentSession); + } + + /// + /// Returns only the request messages that have not yet been persisted to chat history. + /// + /// + /// A message is considered already persisted if any of the following is true: + /// + /// It has the in its . + /// It has an of + /// (indicating it was loaded from chat history and does not need to be re-persisted). + /// It has and all of its items have the + /// in their . This handles the + /// streaming case where reconstructs objects + /// independently via ToChatResponse(), producing different object references that share the same + /// underlying instances. + /// + /// + /// A list of request messages that have not yet been persisted. + /// The full set of request messages to filter. + private static List GetNewRequestMessages(IEnumerable messages) + { + return messages.Where(m => !IsAlreadyPersisted(m)).ToList(); + } + + /// + /// Determines whether a message has already been persisted to chat history by this decorator. + /// + private static bool IsAlreadyPersisted(ChatMessage message) + { + if (message.AdditionalProperties?.TryGetValue(PersistedMarkerKey, out var value) == true && value is true) + { + return true; + } + + if (message.GetAgentRequestMessageSourceType() == AgentRequestMessageSourceType.ChatHistory) + { + return true; + } + + // In streaming mode, FunctionInvokingChatClient reconstructs ChatMessage objects via ToChatResponse() + // independently, producing different ChatMessage instances. However, the underlying AIContent objects + // (e.g., FunctionCallContent, FunctionResultContent) are shared references. Checking for markers on + // AIContent handles dedup in this case. + if (message.Contents.Count > 0 && message.Contents.All(c => c.AdditionalProperties?.TryGetValue(PersistedMarkerKey, out var value) == true && value is true)) + { + return true; + } + + return false; + } + + /// + /// Marks the given messages as persisted by setting a marker on both the + /// and each of its items. + /// + /// + /// Both levels are marked because may reconstruct + /// objects in streaming mode (losing the message-level marker), + /// but the references are shared and retain their markers. + /// + /// The messages to mark as persisted. + private static void MarkAsPersisted(IEnumerable messages) + { + foreach (var message in messages) + { + message.AdditionalProperties ??= new(); + message.AdditionalProperties[PersistedMarkerKey] = true; + + foreach (var content in message.Contents) + { + content.AdditionalProperties ??= new(); + content.AdditionalProperties[PersistedMarkerKey] = true; + } + } + } +} diff --git a/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs new file mode 100644 index 0000000000..32bd718a1c --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.UnitTests/ChatClient/ChatHistoryPersistingChatClientTests.cs @@ -0,0 +1,769 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.AI; +using Microsoft.Extensions.DependencyInjection; +using Moq; +using Moq.Protected; + +namespace Microsoft.Agents.AI.UnitTests; + +/// +/// Contains unit tests for the decorator, +/// verifying that it persists messages via the after each +/// individual service call when the +/// option is enabled. +/// +public class ChatHistoryPersistingChatClientTests +{ + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled, + /// the ChatHistoryProvider receives messages after a successful non-streaming call. + /// + [Fact] + public async Task RunAsync_PersistsMessagesPerServiceCall_WhenOptionEnabledAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — InvokedCoreAsync should be called by the decorator (per service call) + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.RequestMessages.Any(m => m.Text == "test") && + x.ResponseMessages!.Any(m => m.Text == "response")), + ItExpr.IsAny()); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is disabled (default), + /// the ChatHistoryProvider still receives messages at end-of-run as before. + /// + [Fact] + public async Task RunAsync_PersistsMessagesAtEndOfRun_WhenOptionDisabledAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = false, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — InvokedCoreAsync should be called once by the agent (end of run) + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.RequestMessages.Any(m => m.Text == "test") && + x.ResponseMessages!.Any(m => m.Text == "response")), + ItExpr.IsAny()); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled and the service call fails, + /// the ChatHistoryProvider is notified with the exception. + /// + [Fact] + public async Task RunAsync_NotifiesProviderOfFailure_WhenOptionEnabledAndServiceFailsAsync() + { + // Arrange + var expectedException = new InvalidOperationException("Service failed"); + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ThrowsAsync(expectedException); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await Assert.ThrowsAsync(() => agent.RunAsync([new(ChatRole.User, "test")], session)); + + // Assert — the decorator should have notified the provider of the failure + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.InvokeException != null && + x.InvokeException.Message == "Service failed"), + ItExpr.IsAny()); + } + + /// + /// Verifies that the decorator is injected into the pipeline when the option is set + /// and can be discovered via GetService. + /// + [Fact] + public void ChatClient_ContainsDecorator_WhenOptionEnabled() + { + // Arrange + Mock mockService = new(); + + // Act + ChatClientAgent agent = new(mockService.Object, options: new() + { + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Assert + var decorator = agent.ChatClient.GetService(); + Assert.NotNull(decorator); + } + + /// + /// Verifies that the decorator is NOT injected into the pipeline when the option is not set. + /// + [Fact] + public void ChatClient_DoesNotContainDecorator_WhenOptionDisabled() + { + // Arrange + Mock mockService = new(); + + // Act + ChatClientAgent agent = new(mockService.Object, options: new() + { + PersistChatHistoryAfterEachServiceCall = false, + }); + + // Assert + var decorator = agent.ChatClient.GetService(); + Assert.Null(decorator); + } + + /// + /// Verifies that the decorator is NOT injected when UseProvidedChatClientAsIs is true, + /// even if PersistChatHistoryAfterEachServiceCall is also true. + /// + [Fact] + public void ChatClient_DoesNotContainDecorator_WhenUseProvidedChatClientAsIs() + { + // Arrange + Mock mockService = new(); + + // Act + ChatClientAgent agent = new(mockService.Object, options: new() + { + PersistChatHistoryAfterEachServiceCall = true, + UseProvidedChatClientAsIs = true, + }); + + // Assert + var decorator = agent.ChatClient.GetService(); + Assert.Null(decorator); + } + + /// + /// Verifies that the PersistChatHistoryAfterEachServiceCall option is included in Clone(). + /// + [Fact] + public void ChatClientAgentOptions_Clone_IncludesPersistChatHistoryAfterEachServiceCall() + { + // Arrange + var options = new ChatClientAgentOptions + { + PersistChatHistoryAfterEachServiceCall = true, + }; + + // Act + var cloned = options.Clone(); + + // Assert + Assert.True(cloned.PersistChatHistoryAfterEachServiceCall); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled and the service call + /// involves a function invocation loop, the ChatHistoryProvider is called after each individual + /// service call (not just once at the end). + /// + [Fact] + public async Task RunAsync_PersistsPerServiceCall_DuringFunctionInvocationLoopAsync() + { + // Arrange + int serviceCallCount = 0; + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(() => + { + serviceCallCount++; + if (serviceCallCount == 1) + { + // First call returns a tool call + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, [new FunctionCallContent("call1", "myTool", new Dictionary())])])); + } + + // Second call returns a final response + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, "final response")])); + }); + + var invokedContexts = new List(); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Callback((ChatHistoryProvider.InvokedContext ctx, CancellationToken _) => invokedContexts.Add(ctx)) + .Returns(() => new ValueTask()); + + // Define a simple tool + var tool = AIFunctionFactory.Create(() => "tool result", "myTool", "A test tool"); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatOptions = new() { Tools = [tool] }, + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }, services: new ServiceCollection().BuildServiceProvider()); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + Exception? caughtException = null; + try + { + await agent.RunAsync([new(ChatRole.User, "test")], session); + } + catch (Exception ex) + { + caughtException = ex; + } + + // Diagnostic: check if there was an unexpected exception + Assert.Null(caughtException); + + // Assert — the decorator should have been called twice (once per service call in the function invocation loop) + Assert.Equal(2, serviceCallCount); + Assert.Equal(2, invokedContexts.Count); + + // First invocation should have the user message as request and tool call response + Assert.NotNull(invokedContexts[0].ResponseMessages); + var firstRequestMessages = invokedContexts[0].RequestMessages.ToList(); + Assert.Contains(firstRequestMessages, m => m.Text == "test"); + Assert.Contains(invokedContexts[0].ResponseMessages!, m => m.Contents.OfType().Any()); + + // Second invocation: request messages should NOT include the original user message (already notified). + // It should only include messages added since the first call (assistant tool call + tool result). + Assert.NotNull(invokedContexts[1].ResponseMessages); + var secondRequestMessages = invokedContexts[1].RequestMessages.ToList(); + Assert.DoesNotContain(secondRequestMessages, m => m.Text == "test"); + Assert.Contains(invokedContexts[1].ResponseMessages!, m => m.Text == "final response"); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled with streaming, + /// the ChatHistoryProvider receives messages after the stream completes. + /// + [Fact] + public async Task RunStreamingAsync_PersistsMessagesPerServiceCall_WhenOptionEnabledAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetStreamingResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(CreateAsyncEnumerableAsync( + new ChatResponseUpdate(ChatRole.Assistant, "streaming "), + new ChatResponseUpdate(ChatRole.Assistant, "response"))); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await foreach (var _ in agent.RunStreamingAsync([new(ChatRole.User, "test")], session)) + { + // Consume stream + } + + // Assert — InvokedCoreAsync should be called by the decorator + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.RequestMessages.Any(m => m.Text == "test") && + x.ResponseMessages != null), + ItExpr.IsAny()); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled, + /// AIContextProviders are also notified of new messages after a successful call. + /// + [Fact] + public async Task RunAsync_NotifiesAIContextProviders_WhenOptionEnabledAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockContextProvider = new(null, null, null); + mockContextProvider.SetupGet(p => p.StateKeys).Returns(["TestAIContextProvider"]); + mockContextProvider + .Protected() + .Setup>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask(new AIContext())); + mockContextProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + AIContextProviders = [mockContextProvider.Object], + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — InvokedCoreAsync should be called by the decorator for the AIContextProvider + mockContextProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.ResponseMessages != null && + x.ResponseMessages.Any(m => m.Text == "response")), + ItExpr.IsAny()); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled and the service fails, + /// AIContextProviders are notified of the failure. + /// + [Fact] + public async Task RunAsync_NotifiesAIContextProvidersOfFailure_WhenOptionEnabledAsync() + { + // Arrange + var expectedException = new InvalidOperationException("Service failed"); + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ThrowsAsync(expectedException); + + Mock mockContextProvider = new(null, null, null); + mockContextProvider.SetupGet(p => p.StateKeys).Returns(["TestAIContextProvider"]); + mockContextProvider + .Protected() + .Setup>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask(new AIContext())); + mockContextProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + AIContextProviders = [mockContextProvider.Object], + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await Assert.ThrowsAsync(() => agent.RunAsync([new(ChatRole.User, "test")], session)); + + // Assert — the decorator should have notified the AIContextProvider of the failure + mockContextProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.InvokeException != null && + x.InvokeException.Message == "Service failed"), + ItExpr.IsAny()); + } + + /// + /// Verifies that when PersistChatHistoryAfterEachServiceCall is enabled, + /// both ChatHistoryProvider and AIContextProviders are notified together. + /// + [Fact] + public async Task RunAsync_NotifiesBothProviders_WhenOptionEnabledAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + Mock mockContextProvider = new(null, null, null); + mockContextProvider.SetupGet(p => p.StateKeys).Returns(["TestAIContextProvider"]); + mockContextProvider + .Protected() + .Setup>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask(new AIContext())); + mockContextProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + AIContextProviders = [mockContextProvider.Object], + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — both providers should have been notified + mockChatHistoryProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.ResponseMessages != null && + x.ResponseMessages.Any(m => m.Text == "response")), + ItExpr.IsAny()); + + mockContextProvider + .Protected() + .Verify("InvokedCoreAsync", Times.Once(), + ItExpr.Is(x => + x.ResponseMessages != null && + x.ResponseMessages.Any(m => m.Text == "response")), + ItExpr.IsAny()); + } + + /// + /// Verifies that during a FIC loop, response messages from the first call are not + /// re-notified as request messages on the second call. + /// + [Fact] + public async Task RunAsync_DoesNotReNotifyResponseMessagesAsRequestMessages_DuringFicLoopAsync() + { + // Arrange + int serviceCallCount = 0; + var assistantToolCallMessage = new ChatMessage(ChatRole.Assistant, [new FunctionCallContent("call1", "myTool", new Dictionary())]); + + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(() => + { + serviceCallCount++; + if (serviceCallCount == 1) + { + return Task.FromResult(new ChatResponse([assistantToolCallMessage])); + } + + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, "final response")])); + }); + + var invokedContexts = new List(); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Callback((ChatHistoryProvider.InvokedContext ctx, CancellationToken _) => invokedContexts.Add(ctx)) + .Returns(() => new ValueTask()); + + var tool = AIFunctionFactory.Create(() => "tool result", "myTool", "A test tool"); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatOptions = new() { Tools = [tool] }, + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }, services: new ServiceCollection().BuildServiceProvider()); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert + Assert.Equal(2, invokedContexts.Count); + + // The assistant tool call message was a response in call 1 + Assert.Contains(invokedContexts[0].ResponseMessages!, m => ReferenceEquals(m, assistantToolCallMessage)); + + // It should NOT appear as a request in call 2 (it was already notified as a response) + var secondRequestMessages = invokedContexts[1].RequestMessages.ToList(); + Assert.DoesNotContain(secondRequestMessages, m => ReferenceEquals(m, assistantToolCallMessage)); + } + + /// + /// Verifies that when a failure occurs on the second call in a FIC loop, + /// only new request messages (not previously notified) are sent in the failure notification. + /// + [Fact] + public async Task RunAsync_DeduplicatesRequestMessages_OnFailureDuringFicLoopAsync() + { + // Arrange + int serviceCallCount = 0; + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .Returns(() => + { + serviceCallCount++; + if (serviceCallCount == 1) + { + return Task.FromResult(new ChatResponse([new(ChatRole.Assistant, [new FunctionCallContent("call1", "myTool", new Dictionary())])])); + } + + throw new InvalidOperationException("Service failure on second call"); + }); + + var invokedContexts = new List(); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Callback((ChatHistoryProvider.InvokedContext ctx, CancellationToken _) => invokedContexts.Add(ctx)) + .Returns(() => new ValueTask()); + + var tool = AIFunctionFactory.Create(() => "tool result", "myTool", "A test tool"); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatOptions = new() { Tools = [tool] }, + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }, services: new ServiceCollection().BuildServiceProvider()); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await Assert.ThrowsAsync(() => + agent.RunAsync([new(ChatRole.User, "test")], session)); + + // Assert — should have 2 notifications: success on call 1, failure on call 2 + Assert.Equal(2, invokedContexts.Count); + + // First notification: success, has user message as request + Assert.Null(invokedContexts[0].InvokeException); + Assert.Contains(invokedContexts[0].RequestMessages, m => m.Text == "test"); + + // Second notification: failure, should NOT include the user message (already notified) + Assert.NotNull(invokedContexts[1].InvokeException); + var failureRequestMessages = invokedContexts[1].RequestMessages.ToList(); + Assert.DoesNotContain(failureRequestMessages, m => m.Text == "test"); + } + + /// + /// Verifies that after a successful run with per-service-call persistence, the notified + /// messages are stamped with the persisted marker so they are not re-notified. + /// + [Fact] + public async Task RunAsync_MarksNotifiedMessages_WithPersistedMarkerAsync() + { + // Arrange + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())).ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")])); + + Mock mockChatHistoryProvider = new(null, null, null); + mockChatHistoryProvider.SetupGet(p => p.StateKeys).Returns(["TestChatHistoryProvider"]); + mockChatHistoryProvider + .Protected() + .Setup>>("InvokingCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns((ChatHistoryProvider.InvokingContext ctx, CancellationToken _) => + new ValueTask>(ctx.RequestMessages.ToList())); + mockChatHistoryProvider + .Protected() + .Setup("InvokedCoreAsync", ItExpr.IsAny(), ItExpr.IsAny()) + .Returns(() => new ValueTask()); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + ChatHistoryProvider = mockChatHistoryProvider.Object, + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var inputMessage = new ChatMessage(ChatRole.User, "test"); + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([inputMessage], session); + + // Assert — input message should be marked as persisted + Assert.True( + inputMessage.AdditionalProperties?.ContainsKey(ChatHistoryPersistingChatClient.PersistedMarkerKey) == true, + "Input message should be marked as persisted after a successful run."); + } + + /// + /// Verifies that when per-service-call persistence is enabled and the inner client returns a + /// conversation ID, the session's ConversationId is updated after the service call. + /// + [Fact] + public async Task RunAsync_UpdatesSessionConversationId_WhenPerServiceCallPersistenceEnabledAsync() + { + // Arrange + const string ExpectedConversationId = "conv-123"; + + Mock mockService = new(); + mockService.Setup( + s => s.GetResponseAsync( + It.IsAny>(), + It.IsAny(), + It.IsAny())) + .ReturnsAsync(new ChatResponse([new(ChatRole.Assistant, "response")]) + { + ConversationId = ExpectedConversationId, + }); + + ChatClientAgent agent = new(mockService.Object, options: new() + { + PersistChatHistoryAfterEachServiceCall = true, + }); + + // Act + var session = await agent.CreateSessionAsync() as ChatClientAgentSession; + await agent.RunAsync([new(ChatRole.User, "test")], session); + + // Assert — session should have the conversation ID returned by the inner client + Assert.Equal(ExpectedConversationId, session!.ConversationId); + } + + private static async IAsyncEnumerable CreateAsyncEnumerableAsync(params ChatResponseUpdate[] updates) + { + foreach (var update in updates) + { + yield return update; + } + + await Task.CompletedTask; + } +}