From 183b76a773e665f7327c67ad119445c960042c87 Mon Sep 17 00:00:00 2001 From: eanzhao Date: Thu, 20 Mar 2025 11:42:13 +0800 Subject: [PATCH 1/6] feat: add EventDispatcherWorkerGrain to dispatch events to children. --- src/Aevatar.Core/AevatarGAgentConstants.cs | 1 + .../EventDispatcherWorkerGrain.cs | 34 +++++++++++++++++++ .../IEventDispatcherWorkerGrain.cs | 10 ++++++ src/Aevatar.Core/GAgentBase.Publish.cs | 8 ++--- src/Aevatar.Core/GAgentBase.cs | 6 ---- 5 files changed, 49 insertions(+), 10 deletions(-) create mode 100644 src/Aevatar.Core/EventDispatch/EventDispatcherWorkerGrain.cs create mode 100644 src/Aevatar.Core/EventDispatch/IEventDispatcherWorkerGrain.cs diff --git a/src/Aevatar.Core/AevatarGAgentConstants.cs b/src/Aevatar.Core/AevatarGAgentConstants.cs index ca57481b..edb72a60 100644 --- a/src/Aevatar.Core/AevatarGAgentConstants.cs +++ b/src/Aevatar.Core/AevatarGAgentConstants.cs @@ -6,4 +6,5 @@ public static class AevatarGAgentConstants public const string StateHandlerDefaultMethodName = "HandleStateAsync"; public const string ConfigDefaultMethodName = "PerformConfigAsync"; public const string ForwardEventMethodName = "ForwardEventAsync"; + public const int EventDispatcherMaxBatchSize = 20; } \ No newline at end of file diff --git a/src/Aevatar.Core/EventDispatch/EventDispatcherWorkerGrain.cs b/src/Aevatar.Core/EventDispatch/EventDispatcherWorkerGrain.cs new file mode 100644 index 00000000..8c07972f --- /dev/null +++ b/src/Aevatar.Core/EventDispatch/EventDispatcherWorkerGrain.cs @@ -0,0 +1,34 @@ +using Aevatar.Core.Abstractions; +using Orleans.Streams; + +namespace Aevatar.Core.EventDispatch; + +public class EventDispatcherWorkerGrain : Grain, IEventDispatcherWorkerGrain +{ + public async Task ExecuteDispatchAsync(IReadOnlyList> streams, + EventWrapper eventWrapper) where T : EventBase + { + if (streams.Count <= AevatarGAgentConstants.EventDispatcherMaxBatchSize) + { + var tasks = streams.Select(s => s.GetAllSubscriptionHandles()); + await Task.WhenAll(tasks); + } + else + { + var batches = streams + .Select((x, i) => new { Index = i, Value = x }) + .GroupBy(x => x.Index / AevatarGAgentConstants.EventDispatcherMaxBatchSize) + .Select(g => g.Select(x => x.Value).ToList()) + .ToList(); + var subTasks = batches.Select(stream => + { + var worker = GrainFactory.GetGrain( + Guid.NewGuid() + ); + return worker.ExecuteDispatchAsync(stream, eventWrapper); + }); + + await Task.WhenAll(subTasks); + } + } +} \ No newline at end of file diff --git a/src/Aevatar.Core/EventDispatch/IEventDispatcherWorkerGrain.cs b/src/Aevatar.Core/EventDispatch/IEventDispatcherWorkerGrain.cs new file mode 100644 index 00000000..ce5d86e8 --- /dev/null +++ b/src/Aevatar.Core/EventDispatch/IEventDispatcherWorkerGrain.cs @@ -0,0 +1,10 @@ +using Aevatar.Core.Abstractions; +using Orleans.Streams; + +namespace Aevatar.Core.EventDispatch; + +public interface IEventDispatcherWorkerGrain : IGrainWithGuidKey +{ + Task ExecuteDispatchAsync(IReadOnlyList> streams, EventWrapper eventWrapper) + where T : EventBase; +} \ No newline at end of file diff --git a/src/Aevatar.Core/GAgentBase.Publish.cs b/src/Aevatar.Core/GAgentBase.Publish.cs index 1754fb8f..6df1dee0 100644 --- a/src/Aevatar.Core/GAgentBase.Publish.cs +++ b/src/Aevatar.Core/GAgentBase.Publish.cs @@ -1,5 +1,6 @@ using Aevatar.Core.Abstractions; using Aevatar.Core.Abstractions.Exceptions; +using Aevatar.Core.EventDispatch; using Microsoft.Extensions.Logging; using Newtonsoft.Json; @@ -124,10 +125,9 @@ private async Task SendEventDownwardsAsync(EventWrapper eventWrapper) wher try { - foreach (var stream in State.Children.Select(GetEventBaseStream)) - { - await stream.OnNextAsync(eventWrapper); - } + var streams = State.Children.Select(GetEventBaseStream).ToArray(); + var dispatcherWorkerGrain = GrainFactory.GetGrain(Guid.Empty); + await dispatcherWorkerGrain.ExecuteDispatchAsync(streams, eventWrapper); } catch (Exception ex) { diff --git a/src/Aevatar.Core/GAgentBase.cs b/src/Aevatar.Core/GAgentBase.cs index e4d5be44..e7c60a74 100644 --- a/src/Aevatar.Core/GAgentBase.cs +++ b/src/Aevatar.Core/GAgentBase.cs @@ -336,10 +336,4 @@ private IAsyncStream GetEventBaseStream(GrainId grainId) var streamId = StreamId.Create(AevatarOptions!.StreamNamespace, grainIdString); return StreamProvider.GetStream(streamId); } - - private IAsyncStream> GetStateProjectionStream() - { - var streamId = StreamId.Create(AevatarOptions!.StreamNamespace, typeof(StateWrapper).FullName!); - return StreamProvider.GetStream>(streamId); - } } \ No newline at end of file From ca9c50e03b060354bcb65c2a3a7361f5cf3d4aac Mon Sep 17 00:00:00 2001 From: eanzhao Date: Fri, 21 Mar 2025 17:13:59 +0800 Subject: [PATCH 2/6] feat: add a new stream for children on GAgentBase. --- .../Extensions/OrleansHostExtension.cs | 6 ++ src/Aevatar.Core.Abstractions/IStateAgent.cs | 13 ++++ .../EventDispatcherWorkerGrain.cs | 34 ---------- .../IEventDispatcherWorkerGrain.cs | 10 --- src/Aevatar.Core/GAgentBase.Publish.cs | 10 ++- src/Aevatar.Core/GAgentBase.cs | 65 ++++++++++++++++--- .../TestGAgents/ChildTestGAgent.cs | 37 +++++++++++ .../TestGAgents/EventHandlerTestGAgent.cs | 11 ++-- .../SubscriptionTests.cs | 60 +++++++++++++++++ test/Aevatar.TestBase/ClusterFixture.cs | 1 + 10 files changed, 180 insertions(+), 67 deletions(-) delete mode 100644 src/Aevatar.Core/EventDispatch/EventDispatcherWorkerGrain.cs delete mode 100644 src/Aevatar.Core/EventDispatch/IEventDispatcherWorkerGrain.cs create mode 100644 test/Aevatar.Core.Tests/TestGAgents/ChildTestGAgent.cs create mode 100644 test/Aevatar.GAgents.Tests/SubscriptionTests.cs diff --git a/samples/PluginGAgent/PluginGAgent.Silo/Extensions/OrleansHostExtension.cs b/samples/PluginGAgent/PluginGAgent.Silo/Extensions/OrleansHostExtension.cs index 8c89370c..616a5a60 100644 --- a/samples/PluginGAgent/PluginGAgent.Silo/Extensions/OrleansHostExtension.cs +++ b/samples/PluginGAgent/PluginGAgent.Silo/Extensions/OrleansHostExtension.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using MongoDB.Driver; +using Orleans.Configuration; namespace PluginGAgent.Silo.Extensions; @@ -30,6 +31,11 @@ public static IHostBuilder UseOrleansConfiguration(this IHostBuilder hostBuilder }) .AddMemoryStreams("Aevatar") .UseAevatarPermissionManagement() + .Configure(options => + { + options.ClusterId = "default"; + options.ServiceId = "testServiceId"; + }) .UseAevatar() ; }) diff --git a/src/Aevatar.Core.Abstractions/IStateAgent.cs b/src/Aevatar.Core.Abstractions/IStateAgent.cs index 30d5d3da..fab85c03 100644 --- a/src/Aevatar.Core.Abstractions/IStateAgent.cs +++ b/src/Aevatar.Core.Abstractions/IStateAgent.cs @@ -1,4 +1,5 @@ using Orleans.Concurrency; +using Orleans.Streams; namespace Aevatar.Core.Abstractions; @@ -79,6 +80,18 @@ public interface IGAgent : IGrainWithGuidKey /// /// Task ConfigAsync(ConfigurationBase configuration); + + /// + /// Get GAgentAsyncObserver + /// + /// + Task> GetGAgentAsyncObserverAsync(); + + /// + /// Resume subscription of parent's stream. + /// + /// + Task ResumeSubscriptionAsync(IAsyncStream stream); } public interface IStateGAgent : IGAgent diff --git a/src/Aevatar.Core/EventDispatch/EventDispatcherWorkerGrain.cs b/src/Aevatar.Core/EventDispatch/EventDispatcherWorkerGrain.cs deleted file mode 100644 index 8f534378..00000000 --- a/src/Aevatar.Core/EventDispatch/EventDispatcherWorkerGrain.cs +++ /dev/null @@ -1,34 +0,0 @@ -using Aevatar.Core.Abstractions; -using Orleans.Streams; - -namespace Aevatar.Core.EventDispatch; - -public class EventDispatcherWorkerGrain : Grain, IEventDispatcherWorkerGrain -{ - public async Task ExecuteDispatchAsync(IReadOnlyList> streams, - EventWrapper eventWrapper) where T : EventBase - { - if (streams.Count <= AevatarGAgentConstants.EventDispatcherMaxBatchSize) - { - var tasks = streams.Select(s => s.OnNextAsync(eventWrapper)); - await Task.WhenAll(tasks); - } - else - { - var batches = streams - .Select((x, i) => new { Index = i, Value = x }) - .GroupBy(x => x.Index / AevatarGAgentConstants.EventDispatcherMaxBatchSize) - .Select(g => g.Select(x => x.Value).ToList()) - .ToList(); - var subTasks = batches.Select(stream => - { - var worker = GrainFactory.GetGrain( - Guid.NewGuid() - ); - return worker.ExecuteDispatchAsync(stream, eventWrapper); - }); - - await Task.WhenAll(subTasks); - } - } -} \ No newline at end of file diff --git a/src/Aevatar.Core/EventDispatch/IEventDispatcherWorkerGrain.cs b/src/Aevatar.Core/EventDispatch/IEventDispatcherWorkerGrain.cs deleted file mode 100644 index ce5d86e8..00000000 --- a/src/Aevatar.Core/EventDispatch/IEventDispatcherWorkerGrain.cs +++ /dev/null @@ -1,10 +0,0 @@ -using Aevatar.Core.Abstractions; -using Orleans.Streams; - -namespace Aevatar.Core.EventDispatch; - -public interface IEventDispatcherWorkerGrain : IGrainWithGuidKey -{ - Task ExecuteDispatchAsync(IReadOnlyList> streams, EventWrapper eventWrapper) - where T : EventBase; -} \ No newline at end of file diff --git a/src/Aevatar.Core/GAgentBase.Publish.cs b/src/Aevatar.Core/GAgentBase.Publish.cs index 6df1dee0..7efc36ca 100644 --- a/src/Aevatar.Core/GAgentBase.Publish.cs +++ b/src/Aevatar.Core/GAgentBase.Publish.cs @@ -1,6 +1,5 @@ using Aevatar.Core.Abstractions; using Aevatar.Core.Abstractions.Exceptions; -using Aevatar.Core.EventDispatch; using Microsoft.Extensions.Logging; using Newtonsoft.Json; @@ -86,7 +85,7 @@ private async Task SendEventUpwardsAsync(EventWrapper eventWrapper) where try { - var stream = GetEventBaseStream(State.Parent.Value); + var stream = GetEventBaseStreamForOwn(State.Parent.Value); await stream.OnNextAsync(eventWrapper); } catch (Exception ex) @@ -103,7 +102,7 @@ private async Task SendEventToSelfAsync(EventWrapper eventWrapper) where T $"{GrainId.ToString()} is sending event to self: {JsonConvert.SerializeObject(eventWrapper)}"); try { - var streamOfThisGAgent = GetEventBaseStream(GrainId); + var streamOfThisGAgent = GetEventBaseStreamForOwn(GrainId); await streamOfThisGAgent.OnNextAsync(eventWrapper); } catch (Exception ex) @@ -125,9 +124,8 @@ private async Task SendEventDownwardsAsync(EventWrapper eventWrapper) wher try { - var streams = State.Children.Select(GetEventBaseStream).ToArray(); - var dispatcherWorkerGrain = GrainFactory.GetGrain(Guid.Empty); - await dispatcherWorkerGrain.ExecuteDispatchAsync(streams, eventWrapper); + var streamForChildren = GetEventBaseStreamForChildren(GrainId); + await streamForChildren.OnNextAsync(eventWrapper); } catch (Exception ex) { diff --git a/src/Aevatar.Core/GAgentBase.cs b/src/Aevatar.Core/GAgentBase.cs index ae90adde..2ab5e273 100644 --- a/src/Aevatar.Core/GAgentBase.cs +++ b/src/Aevatar.Core/GAgentBase.cs @@ -1,4 +1,3 @@ -using System.Collections.Concurrent; using Aevatar.Core.Abstractions; using Aevatar.Core.Abstractions.Projections; using Microsoft.Extensions.DependencyInjection; @@ -6,6 +5,7 @@ using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Extensions.Options; using Newtonsoft.Json; +using Nito.AsyncEx; using Orleans.EventSourcing; using Orleans.Providers; using Orleans.Streams; @@ -45,7 +45,11 @@ public abstract partial class private Lazy LazyStreamProvider => new(() => this.GetStreamProvider(AevatarCoreConstants.StreamProvider)); + private Lazy LazyGAgentFactory => new(() + => ServiceProvider.GetRequiredService()); + protected IStreamProvider StreamProvider => LazyStreamProvider.Value; + protected IGAgentFactory GAgentFactory => LazyGAgentFactory.Value; public ILogger Logger { get; set; } = NullLogger.Instance; @@ -72,14 +76,18 @@ public async Task RegisterAsync(IGAgent gAgent) await OnRegisterAgentAsync(gAgent.GetGrainId()); } - public Task SubscribeToAsync(IGAgent gAgent) + public async Task SubscribeToAsync(IGAgent gAgent) { - return SetParentAsync(gAgent.GetGrainId()); + var parentGrainId = gAgent.GetGrainId(); + var parentStream = GetEventBaseStreamForChildren(parentGrainId); + var asyncObserver = new GAgentAsyncObserver(_observers); + await ResumeOrSubscribeAsync(parentStream, asyncObserver); + await SetParentAsync(gAgent.GetGrainId()); } - public Task UnsubscribeFromAsync(IGAgent gAgent) + public async Task UnsubscribeFromAsync(IGAgent gAgent) { - return ClearParentAsync(gAgent.GetGrainId()); + await ClearParentAsync(gAgent.GetGrainId()); } public async Task UnregisterAsync(IGAgent gAgent) @@ -89,7 +97,7 @@ public async Task UnregisterAsync(IGAgent gAgent) await OnUnregisterAgentAsync(gAgent.GetGrainId()); } - public virtual Task?> GetAllSubscribedEventsAsync(bool includeBaseHandlers = false) + public async virtual Task?> GetAllSubscribedEventsAsync(bool includeBaseHandlers = false) { var eventHandlerMethods = GetEventHandlerMethods(GetType()); eventHandlerMethods = eventHandlerMethods.Where(m => @@ -101,7 +109,7 @@ public async Task UnregisterAsync(IGAgent gAgent) handlingTypes = handlingTypes.Where(t => t != typeof(RequestAllSubscriptionsEvent)); } - return Task.FromResult(handlingTypes.ToList())!; + return handlingTypes.ToList(); } public Task> GetChildrenAsync() @@ -127,6 +135,18 @@ public async Task ConfigAsync(ConfigurationBase configuration) } } + public async Task> GetGAgentAsyncObserverAsync() + { + var asyncObserver = new GAgentAsyncObserver(_observers); + return asyncObserver; + } + + public async Task ResumeSubscriptionAsync(IAsyncStream stream) + { + var asyncObserver = new GAgentAsyncObserver(_observers); + await ResumeOrSubscribeAsync(stream, asyncObserver); + } + protected virtual Task PerformConfigAsync(TConfiguration configuration) { return Task.CompletedTask; @@ -238,6 +258,7 @@ private async Task BaseOnActivateAsync(CancellationToken cancellationToken) var initTasks = new[] { InitializeOrResumeEventBaseStreamAsync(), + ResumeEventBaseStreamForChildrenAsync(), ActivateProjectionGrainAsync() }; await Task.WhenAll(initTasks); @@ -245,11 +266,24 @@ private async Task BaseOnActivateAsync(CancellationToken cancellationToken) private async Task InitializeOrResumeEventBaseStreamAsync() { - var streamOfThisGAgent = GetEventBaseStream(this.GetGrainId()); + var streamOfThisGAgent = GetEventBaseStreamForOwn(this.GetGrainId()); var asyncObserver = new GAgentAsyncObserver(_observers); await ResumeOrSubscribeAsync(streamOfThisGAgent, asyncObserver); } + private async Task ResumeEventBaseStreamForChildrenAsync() + { + var streamForChildren = GetEventBaseStreamForChildren(this.GetGrainId()); + var tasks = new List(); + foreach (var childGrainId in State.Children) + { + var child = await GAgentFactory.GetGAgentAsync(childGrainId); + tasks.Add(child.ResumeSubscriptionAsync(streamForChildren)); + } + + await tasks.WhenAll(); + } + private async Task ActivateProjectionGrainAsync() { var projectionGrain = GrainFactory.GetGrain(typeof(TState).FullName); @@ -330,10 +364,21 @@ protected virtual Task HandleRaiseEventAsync() return Task.CompletedTask; } - private IAsyncStream GetEventBaseStream(GrainId grainId) + private IAsyncStream GetEventBaseStreamForOwn(GrainId grainId) { var grainIdString = grainId.ToString(); - var streamId = StreamId.Create(AevatarOptions!.StreamNamespace, grainIdString); + return GetEventBaseStream(grainIdString); + } + + private IAsyncStream GetEventBaseStreamForChildren(GrainId grainId) + { + var grainIdString = $"{grainId.ToString()}/Children"; + return GetEventBaseStream(grainIdString); + } + + private IAsyncStream GetEventBaseStream(string streamKey) + { + var streamId = StreamId.Create(AevatarOptions!.StreamNamespace, streamKey); return StreamProvider.GetStream(streamId); } } \ No newline at end of file diff --git a/test/Aevatar.Core.Tests/TestGAgents/ChildTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/ChildTestGAgent.cs new file mode 100644 index 00000000..b9d59a71 --- /dev/null +++ b/test/Aevatar.Core.Tests/TestGAgents/ChildTestGAgent.cs @@ -0,0 +1,37 @@ +using Aevatar.Core.Abstractions; +using Aevatar.Core.Tests.TestEvents; + +namespace Aevatar.Core.Tests.TestGAgents; + +[GenerateSerializer] + +public class ChildTestGAgentState : StateBase +{ + [Id(0)] public List Content { get; set; } = []; +} + +[GenerateSerializer] +public class ChildTestStateLogEvent : StateLogEventBase; + +public interface IChildTestGAgent : IStateGAgent; + +[GAgent] +public class ChildTestGAgent : GAgentBase, IChildTestGAgent +{ + public override Task GetDescriptionAsync() + { + return Task.FromResult("This is a GAgent for testing child."); + } + + [EventHandler] + public Task ExecuteAsync(NaiveTestEvent eventData) + { + AddContent(eventData.Greeting); + return Task.CompletedTask; + } + + private void AddContent(string content) + { + State.Content.Add(content); + } +} \ No newline at end of file diff --git a/test/Aevatar.Core.Tests/TestGAgents/EventHandlerTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/EventHandlerTestGAgent.cs index 2c2d9d21..dcb25fee 100644 --- a/test/Aevatar.Core.Tests/TestGAgents/EventHandlerTestGAgent.cs +++ b/test/Aevatar.Core.Tests/TestGAgents/EventHandlerTestGAgent.cs @@ -7,13 +7,15 @@ namespace Aevatar.Core.Tests.TestGAgents; [GenerateSerializer] public class EventHandlerTestGAgentState : StateBase { - [Id(0)] public List Content { get; set; } + [Id(0)] public List Content { get; set; } = []; } public class EventHandlerTestStateLogEvent : StateLogEventBase; +public interface IEventHandlerTestGAgent : IStateGAgent; + [GAgent("eventHandlerTest", "test")] -public class EventHandlerTestGAgent : GAgentBase +public class EventHandlerTestGAgent : GAgentBase, IEventHandlerTestGAgent { public override Task GetDescriptionAsync() { @@ -55,11 +57,6 @@ public Task HandleEventWithExceptionAsync(NaiveTestEvent eventData) private void AddContent(string content) { - if (State.Content.IsNullOrEmpty()) - { - State.Content = []; - } - State.Content.Add(content); } } \ No newline at end of file diff --git a/test/Aevatar.GAgents.Tests/SubscriptionTests.cs b/test/Aevatar.GAgents.Tests/SubscriptionTests.cs new file mode 100644 index 00000000..a3b5d258 --- /dev/null +++ b/test/Aevatar.GAgents.Tests/SubscriptionTests.cs @@ -0,0 +1,60 @@ +using System.Diagnostics; +using Aevatar.Core.Abstractions; +using Aevatar.Core.Tests.TestEvents; +using Aevatar.Core.Tests.TestGAgents; +using Shouldly; +using Xunit.Abstractions; + +namespace Aevatar.GAgents.Tests; + +public sealed class SubscriptionTests : AevatarGAgentsTestBase +{ + private readonly ITestOutputHelper _outputHelper; + private readonly IGAgentFactory _gAgentFactory; + + public SubscriptionTests(ITestOutputHelper outputHelper) + { + _outputHelper = outputHelper; + _gAgentFactory = GetRequiredService(); + } + + [Theory] + [InlineData(10)] + [InlineData(100)] + public async Task ManyChildrenTest(int count) + { + var stopWatch = new Stopwatch(); + stopWatch.Start(); + var children = new List(); + for (var i = 0; i < count; i++) + { + var child = await _gAgentFactory.GetGAgentAsync(); + children.Add(child); + } + + var groupGAgent = await _gAgentFactory.GetGAgentAsync>(Guid.NewGuid()); + foreach (var child in children) + { + await groupGAgent.RegisterAsync(child); + } + + var publishingGAgent = await _gAgentFactory.GetGAgentAsync(); + await publishingGAgent.RegisterAsync(groupGAgent); + await publishingGAgent.PublishEventAsync(new NaiveTestEvent + { + Greeting = "test" + }); + + await Task.Delay(10000); + + foreach (var child in children.AsParallel()) + { + var state = await child.GetStateAsync(); + state.Content.Count.ShouldBe(1); + state.Content[0].ShouldBe("test"); + } + + stopWatch.Stop(); + _outputHelper.WriteLine($"Time: {stopWatch.ElapsedMilliseconds}ms"); + } +} \ No newline at end of file diff --git a/test/Aevatar.TestBase/ClusterFixture.cs b/test/Aevatar.TestBase/ClusterFixture.cs index 3b6b36dd..733e7f8a 100644 --- a/test/Aevatar.TestBase/ClusterFixture.cs +++ b/test/Aevatar.TestBase/ClusterFixture.cs @@ -101,6 +101,7 @@ public void Configure(ISiloBuilder hostBuilder) services.AddSingleton(grainTypeMap); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); }) .UseAevatar() .UseAevatarPermissionManagement() From b76ea1bcc87a27e452c1a343516e890b0215a6f9 Mon Sep 17 00:00:00 2001 From: eanzhao Date: Mon, 24 Mar 2025 18:03:37 +0800 Subject: [PATCH 3/6] feat: add EventPubGrain. --- src/Aevatar.Core.Abstractions/IEventPubGrain.cs | 6 ++++++ src/Aevatar.Core/EventPubGrain.cs | 12 ++++++++++++ 2 files changed, 18 insertions(+) create mode 100644 src/Aevatar.Core.Abstractions/IEventPubGrain.cs create mode 100644 src/Aevatar.Core/EventPubGrain.cs diff --git a/src/Aevatar.Core.Abstractions/IEventPubGrain.cs b/src/Aevatar.Core.Abstractions/IEventPubGrain.cs new file mode 100644 index 00000000..b9c7edd5 --- /dev/null +++ b/src/Aevatar.Core.Abstractions/IEventPubGrain.cs @@ -0,0 +1,6 @@ +namespace Aevatar.Core.Abstractions; + +public interface IEventPubGrain : IGrainWithGuidCompoundKey +{ + +} \ No newline at end of file diff --git a/src/Aevatar.Core/EventPubGrain.cs b/src/Aevatar.Core/EventPubGrain.cs new file mode 100644 index 00000000..04500ce8 --- /dev/null +++ b/src/Aevatar.Core/EventPubGrain.cs @@ -0,0 +1,12 @@ +using Aevatar.Core.Abstractions; + +namespace Aevatar.Core; + +public class EventPubGrain : Grain, IEventPubGrain +{ + public override Task OnActivateAsync(CancellationToken cancellationToken) + { + var parentGrainId = this.GetGrainId().GetGuidKey(); + return base.OnActivateAsync(cancellationToken); + } +} \ No newline at end of file From 6b454386574f2dbe62713b6617cb827c8c90b0df Mon Sep 17 00:00:00 2001 From: eanzhao Date: Tue, 1 Apr 2025 15:18:57 +0800 Subject: [PATCH 4/6] feat: framework of event pub grains --- .../AevatarCoreConstants.cs | 1 + .../AevatarCoreStreamConfig.cs | 14 ++ .../IEventPubChildrenGroupGrain.cs | 11 + .../EventPublish/IEventPubGrain.cs | 6 + .../EventPublish/IStreamCoordinatorGrain.cs | 13 ++ .../IEventPubGrain.cs | 6 - src/Aevatar.Core.Abstractions/StateBase.cs | 1 + src/Aevatar.Core/AevatarGAgentConstants.cs | 2 +- src/Aevatar.Core/EventPubGrain.cs | 12 - .../EventPubChildrenGroupGrain.cs | 96 ++++++++ .../EventPublish/EventPubGrain.cs | 25 ++ .../EventPublish/StreamCoordinatorGrain.cs | 218 ++++++++++++++++++ .../Extensions/StreamExtensions.cs | 16 ++ src/Aevatar.Core/GAgentBase.Publish.cs | 95 +------- src/Aevatar.Core/GAgentBase.Subscribe.cs | 27 ++- src/Aevatar.Core/GAgentBase.cs | 76 +++--- .../Extensions/OrleansHostExtensions.cs | 4 + test/Aevatar.Core.Tests/EventHandlingTests.cs | 8 +- test/Aevatar.Core.Tests/EventSourcingTests.cs | 2 +- test/Aevatar.Core.Tests/GAgentTestKitBase.cs | 2 +- test/Aevatar.Core.Tests/GroupingTests.cs | 11 +- test/Aevatar.Core.Tests/PublishingTests.cs | 8 +- .../TestGAgents/DeveloperTestGAgent.cs | 7 - .../TestGAgents/DevelopingLeaderTestGAgent.cs | 5 - .../TestGAgents/InvestorTestGAgent.cs | 7 +- .../TestGAgents/MarketingLeaderTestGAgent.cs | 11 +- .../TestGAgents/NaiveTestGAgent.cs | 2 +- test/Aevatar.GAgents.Tests/GAgentBaseTests.cs | 10 +- .../GAgentEventPubTests.cs | 55 +++++ test/OrleansTestKit/TestGrainFactory.cs | 15 +- test/OrleansTestKit/TestKitSilo.cs | 12 + 31 files changed, 557 insertions(+), 221 deletions(-) create mode 100644 src/Aevatar.Core.Abstractions/AevatarCoreStreamConfig.cs create mode 100644 src/Aevatar.Core.Abstractions/EventPublish/IEventPubChildrenGroupGrain.cs create mode 100644 src/Aevatar.Core.Abstractions/EventPublish/IEventPubGrain.cs create mode 100644 src/Aevatar.Core.Abstractions/EventPublish/IStreamCoordinatorGrain.cs delete mode 100644 src/Aevatar.Core.Abstractions/IEventPubGrain.cs delete mode 100644 src/Aevatar.Core/EventPubGrain.cs create mode 100644 src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs create mode 100644 src/Aevatar.Core/EventPublish/EventPubGrain.cs create mode 100644 src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs create mode 100644 src/Aevatar.Core/Extensions/StreamExtensions.cs create mode 100644 test/Aevatar.GAgents.Tests/GAgentEventPubTests.cs diff --git a/src/Aevatar.Core.Abstractions/AevatarCoreConstants.cs b/src/Aevatar.Core.Abstractions/AevatarCoreConstants.cs index f47f267a..14adaf38 100644 --- a/src/Aevatar.Core.Abstractions/AevatarCoreConstants.cs +++ b/src/Aevatar.Core.Abstractions/AevatarCoreConstants.cs @@ -3,5 +3,6 @@ namespace Aevatar.Core.Abstractions; public class AevatarCoreConstants { public const string StreamProvider = "Aevatar"; + public const string DefaultStreamNamespace = "Aevatar"; public const char GAgentNamespaceSeparator = '.'; } \ No newline at end of file diff --git a/src/Aevatar.Core.Abstractions/AevatarCoreStreamConfig.cs b/src/Aevatar.Core.Abstractions/AevatarCoreStreamConfig.cs new file mode 100644 index 00000000..2edacebe --- /dev/null +++ b/src/Aevatar.Core.Abstractions/AevatarCoreStreamConfig.cs @@ -0,0 +1,14 @@ +namespace Aevatar.Core.Abstractions; + +public static class AevatarCoreStreamConfig +{ + public static string Prefix { get; private set; } = AevatarCoreConstants.DefaultStreamNamespace; + + public static void Initialize(string? prefix) + { + if (prefix != null) + { + Prefix = prefix; + } + } +} \ No newline at end of file diff --git a/src/Aevatar.Core.Abstractions/EventPublish/IEventPubChildrenGroupGrain.cs b/src/Aevatar.Core.Abstractions/EventPublish/IEventPubChildrenGroupGrain.cs new file mode 100644 index 00000000..b7cbe081 --- /dev/null +++ b/src/Aevatar.Core.Abstractions/EventPublish/IEventPubChildrenGroupGrain.cs @@ -0,0 +1,11 @@ +namespace Aevatar.Core.Abstractions.EventPublish; + +public interface IEventPubChildrenGroupGrain : IGrainWithIntegerCompoundKey +{ + Task DownwardsEventAsync(EventWrapperBase eventWrapper); + Task UpwardsEventAsync(EventWrapperBase eventWrapper); + Task AddChildAsync(GrainId childId); + Task RemoveChildAsync(GrainId childId); + Task> GetChildrenAsync(); + Task GetChildrenCountAsync(); +} \ No newline at end of file diff --git a/src/Aevatar.Core.Abstractions/EventPublish/IEventPubGrain.cs b/src/Aevatar.Core.Abstractions/EventPublish/IEventPubGrain.cs new file mode 100644 index 00000000..95ebc35c --- /dev/null +++ b/src/Aevatar.Core.Abstractions/EventPublish/IEventPubGrain.cs @@ -0,0 +1,6 @@ +namespace Aevatar.Core.Abstractions.EventPublish; + +public interface IEventPubGrain : IGrainWithStringKey +{ + Task PublishEventAsync(EventWrapperBase eventWrapper); +} \ No newline at end of file diff --git a/src/Aevatar.Core.Abstractions/EventPublish/IStreamCoordinatorGrain.cs b/src/Aevatar.Core.Abstractions/EventPublish/IStreamCoordinatorGrain.cs new file mode 100644 index 00000000..69076020 --- /dev/null +++ b/src/Aevatar.Core.Abstractions/EventPublish/IStreamCoordinatorGrain.cs @@ -0,0 +1,13 @@ +namespace Aevatar.Core.Abstractions.EventPublish; + +public interface IStreamCoordinatorGrain : IGrainWithStringKey +{ + Task SetParentAsync(GrainId parentGrainId); + Task GetParentAsync(); + Task RegisterChildAsync(GrainId childGrainId); + Task UnregisterChildAsync(GrainId childGrainId); + Task> GetChildrenAsync(); + Task PublishEventAsync(EventWrapperBase eventWrapper); + Task DownwardsEventAsync(EventWrapperBase eventWrapper); + Task UpwardsEventAsync(EventWrapperBase eventWrapper); +} \ No newline at end of file diff --git a/src/Aevatar.Core.Abstractions/IEventPubGrain.cs b/src/Aevatar.Core.Abstractions/IEventPubGrain.cs deleted file mode 100644 index b9c7edd5..00000000 --- a/src/Aevatar.Core.Abstractions/IEventPubGrain.cs +++ /dev/null @@ -1,6 +0,0 @@ -namespace Aevatar.Core.Abstractions; - -public interface IEventPubGrain : IGrainWithGuidCompoundKey -{ - -} \ No newline at end of file diff --git a/src/Aevatar.Core.Abstractions/StateBase.cs b/src/Aevatar.Core.Abstractions/StateBase.cs index f476b423..1f8aaeee 100644 --- a/src/Aevatar.Core.Abstractions/StateBase.cs +++ b/src/Aevatar.Core.Abstractions/StateBase.cs @@ -6,6 +6,7 @@ public abstract class StateBase [Id(0)] public List Children { get; set; } = []; [Id(1)] public GrainId? Parent { get; set; } [Id(2)] public string? GAgentCreator { get; set; } + [Id(3)] public GrainId StreamCoordinatorGrainId { get; set; } public void Apply(StateLogEventBase @stateLogEvent) { diff --git a/src/Aevatar.Core/AevatarGAgentConstants.cs b/src/Aevatar.Core/AevatarGAgentConstants.cs index edb72a60..b1de366f 100644 --- a/src/Aevatar.Core/AevatarGAgentConstants.cs +++ b/src/Aevatar.Core/AevatarGAgentConstants.cs @@ -6,5 +6,5 @@ public static class AevatarGAgentConstants public const string StateHandlerDefaultMethodName = "HandleStateAsync"; public const string ConfigDefaultMethodName = "PerformConfigAsync"; public const string ForwardEventMethodName = "ForwardEventAsync"; - public const int EventDispatcherMaxBatchSize = 20; + public const int MaxChildrenPerGroup = 1000; } \ No newline at end of file diff --git a/src/Aevatar.Core/EventPubGrain.cs b/src/Aevatar.Core/EventPubGrain.cs deleted file mode 100644 index 04500ce8..00000000 --- a/src/Aevatar.Core/EventPubGrain.cs +++ /dev/null @@ -1,12 +0,0 @@ -using Aevatar.Core.Abstractions; - -namespace Aevatar.Core; - -public class EventPubGrain : Grain, IEventPubGrain -{ - public override Task OnActivateAsync(CancellationToken cancellationToken) - { - var parentGrainId = this.GetGrainId().GetGuidKey(); - return base.OnActivateAsync(cancellationToken); - } -} \ No newline at end of file diff --git a/src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs b/src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs new file mode 100644 index 00000000..471efff7 --- /dev/null +++ b/src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs @@ -0,0 +1,96 @@ +using Aevatar.Core.Abstractions; +using Aevatar.Core.Abstractions.EventPublish; +using Aevatar.Core.Extensions; +using Orleans.EventSourcing; +using Orleans.Providers; +using Orleans.Streams; + +[GenerateSerializer] +public class EventPubChildrenGroupState : StateBase +{ + [Id(0)] public List Children { get; set; } = new(); + [Id(1)] public int ChildrenCount { get; set; } +} + +[GenerateSerializer] +public class EventPubChildrenGroupStateLogEvent : StateLogEventBase; + +[StorageProvider(ProviderName = "PubSubStore")] +[LogConsistencyProvider(ProviderName = "LogStorage")] +public class EventPubChildrenGroupGrain : + JournaledGrain>, + IEventPubChildrenGroupGrain +{ + private readonly IStreamProvider _streamProvider; + + public EventPubChildrenGroupGrain() + { + _streamProvider = this.GetStreamProvider(AevatarCoreConstants.StreamProvider); + } + + public async Task DownwardsEventAsync(EventWrapperBase eventWrapper) + { + foreach (var eventPubGrain in State.Children.Select(grainId => + GrainFactory.GetGrain(grainId.ToString()))) + { + await eventPubGrain.PublishEventAsync(eventWrapper); + } + } + + public async Task UpwardsEventAsync(EventWrapperBase eventWrapper) + { + var stream = _streamProvider.GetEventWrapperBaseStream(this.GetPrimaryKeyString()); + await stream.OnNextAsync(eventWrapper); + } + + public async Task AddChildAsync(GrainId childId) + { + RaiseEvent(new AddChildStateLogEvent { ChildId = childId }); + await ConfirmEvents(); + } + + public async Task RemoveChildAsync(GrainId childId) + { + RaiseEvent(new RemoveChildStateLogEvent { ChildId = childId }); + await ConfirmEvents(); + } + + public Task> GetChildrenAsync() + { + return Task.FromResult(State.Children); + } + + public Task GetChildrenCountAsync() + { + return Task.FromResult(State.ChildrenCount); + } + + protected override void TransitionState(EventPubChildrenGroupState state, StateLogEventBase @event) + { + switch (@event) + { + case AddChildStateLogEvent addChildEvent: + state.Children.Add(addChildEvent.ChildId); + state.ChildrenCount++; + break; + case RemoveChildStateLogEvent removeChildEvent: + state.Children.Remove(removeChildEvent.ChildId); + state.ChildrenCount--; + break; + } + + base.TransitionState(state, @event); + } + + [GenerateSerializer] + public class AddChildStateLogEvent : StateLogEventBase + { + [Id(0)] public GrainId ChildId { get; set; } + } + + [GenerateSerializer] + public class RemoveChildStateLogEvent : StateLogEventBase + { + [Id(0)] public GrainId ChildId { get; set; } + } +} \ No newline at end of file diff --git a/src/Aevatar.Core/EventPublish/EventPubGrain.cs b/src/Aevatar.Core/EventPublish/EventPubGrain.cs new file mode 100644 index 00000000..f9adf8fa --- /dev/null +++ b/src/Aevatar.Core/EventPublish/EventPubGrain.cs @@ -0,0 +1,25 @@ +using Aevatar.Core.Abstractions; +using Aevatar.Core.Abstractions.EventPublish; +using Aevatar.Core.Extensions; +using Orleans.Streams; + +namespace Aevatar.Core.EventPublish; + +public class EventPubGrain : Grain, IEventPubGrain +{ + private readonly IStreamProvider _streamProvider; + + public EventPubGrain() + { + _streamProvider = this.GetStreamProvider(AevatarCoreConstants.StreamProvider); + } + + public async Task PublishEventAsync(EventWrapperBase eventWrapper) + { + var stream = _streamProvider.GetEventWrapperBaseStream(this.GetPrimaryKeyString()); + await stream.OnNextAsync(eventWrapper); + Called.Add(this.GetPrimaryKeyString()); + } + + public static List Called = []; +} \ No newline at end of file diff --git a/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs b/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs new file mode 100644 index 00000000..647b8d29 --- /dev/null +++ b/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs @@ -0,0 +1,218 @@ +using Aevatar.Core; +using Aevatar.Core.Abstractions; +using Aevatar.Core.Abstractions.EventPublish; +using Aevatar.Core.Extensions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using Orleans.EventSourcing; +using Orleans.Providers; +using Orleans.Streams; + +[GenerateSerializer] +public class StreamCoordinatorState : StateBase +{ + /// + /// Group index -> Children count. + /// + [Id(1)] public Dictionary GroupChildrenCount { get; set; } = new(); + [Id(2)] public GrainId Parent { get; set; } +} + +[GenerateSerializer] +public class StreamCoordinatorStateLogEvent : StateLogEventBase; + +[StorageProvider(ProviderName = "PubSubStore")] +[LogConsistencyProvider(ProviderName = "LogStorage")] +public class StreamCoordinatorGrain : + JournaledGrain>, + IStreamCoordinatorGrain +{ + private readonly ILogger _logger; + private readonly IStreamProvider _streamProvider; + private readonly AevatarOptions _aevatarOptions; + + public StreamCoordinatorGrain(ILogger logger) + { + _logger = logger; + _streamProvider = this.GetStreamProvider(AevatarCoreConstants.StreamProvider); + _aevatarOptions = ServiceProvider.GetRequiredService>().Value; + } + + public async Task SetParentAsync(GrainId parentGrainId) + { + if (State.Parent == parentGrainId) + { + return false; + } + RaiseEvent(new SetParentStateLogEvent{ParentGrainId = parentGrainId}); + await ConfirmEvents(); + return true; + } + + public Task GetParentAsync() + { + return Task.FromResult(State.Parent); + } + + public async Task RegisterChildAsync(GrainId childGrainId) + { + var groupIndex = State.GroupChildrenCount + .FirstOrDefault(kvp => kvp.Value <= AevatarGAgentConstants.MaxChildrenPerGroup).Key; + var childGroupGrain = GetChildGroupGrain(groupIndex); + await childGroupGrain.AddChildAsync(childGrainId); + var childrenCount = await childGroupGrain.GetChildrenCountAsync(); + RaiseEvent(new UpdateGroupChildrenCountStateLogEvent { GroupIndex = groupIndex, ChildrenCount = childrenCount }); + RaiseEvent(new RegisterChildStateLogEvent { ChildGroupGrainId = childGrainId, GroupIndex = groupIndex }); + await SubscribeToChildAsync(childGrainId, groupIndex); + await ConfirmEvents(); + } + + public async Task UnregisterChildAsync(GrainId childGrainId) + { + foreach (var group in State.GroupChildrenCount) + { + var childGroupGrain = + GrainFactory.GetGrain(group.Key, this.GetPrimaryKeyString()); + + var children = await childGroupGrain.GetChildrenAsync(); + if (children.Contains(childGrainId)) + { + await childGroupGrain.RemoveChildAsync(childGrainId); + RaiseEvent(new UnregisterChildStateLogEvent { ChildGrainId = childGrainId, GroupIndex = group.Key }); + await ConfirmEvents(); + + await UnsubscribeFromChildAsync(childGrainId, group.Key); + break; + } + } + } + + public async Task> GetChildrenAsync() + { + var allChildren = new List(); + foreach (var group in State.GroupChildrenCount) + { + var childGroupGrain = + GrainFactory.GetGrain(group.Key, this.GetPrimaryKeyString()); + var children = await childGroupGrain.GetChildrenAsync(); + allChildren.AddRange(children); + } + + return allChildren; + } + + public async Task PublishEventAsync(EventWrapperBase eventWrapper) + { + if (State.Parent == default) + { + _logger.LogInformation( + "Event is the first time appeared to silo: {@Event}", eventWrapper); + await DownwardsEventAsync(eventWrapper); + } + else + { + _logger.LogInformation( + "{GrainId} is publishing event upwards: {EventJson}", this.GetPrimaryKeyString(), + JsonConvert.SerializeObject(eventWrapper)); + await UpwardsEventAsync(eventWrapper); + } + } + + public async Task DownwardsEventAsync(EventWrapperBase eventWrapper) + { + foreach (var (groupIndex, _) in State.GroupChildrenCount) + { + var childrenGroupGrain = GetChildGroupGrain(groupIndex); + await childrenGroupGrain.DownwardsEventAsync(eventWrapper); + } + } + + public async Task UpwardsEventAsync(EventWrapperBase eventWrapper) + { + // Try self handling + var selfEventPubGrain = GrainFactory.GetGrain($"{this.GetPrimaryKeyString()}"); + await selfEventPubGrain.PublishEventAsync(eventWrapper); + + // Parent handling + if (State.Parent != default) + { + // var parentEventPubGrain = GrainFactory.GetGrain(State.Parent.ToString()); + // await parentEventPubGrain.PublishEventAsync(eventWrapper); + // To avoid too many producers on parent's corresponding PubSubRendezvousGrain state. + var parentStream = _streamProvider.GetEventWrapperBaseStream(State.Parent); + await parentStream.OnNextAsync(eventWrapper); + } + } + + private IEventPubChildrenGroupGrain GetChildGroupGrain(int groupIndex) + { + return GrainFactory.GetGrain(groupIndex, this.GetPrimaryKeyString()); + } + + private async Task SubscribeToChildAsync(GrainId childId, int groupIndex) + { + + } + + private async Task UnsubscribeFromChildAsync(GrainId childId, int groupIndex) + { + } + + protected override void TransitionState(StreamCoordinatorState state, + StateLogEventBase @event) + { + switch (@event) + { + case SetParentStateLogEvent setParentEvent: + State.Parent = setParentEvent.ParentGrainId; + break; + case UpdateGroupChildrenCountStateLogEvent updateEvent: + State.GroupChildrenCount[updateEvent.GroupIndex] = updateEvent.ChildrenCount; + if (updateEvent.ChildrenCount == AevatarGAgentConstants.MaxChildrenPerGroup) + { + State.GroupChildrenCount[updateEvent.GroupIndex + 1] = 0; + } + break; + case RegisterChildStateLogEvent registerEvent: + break; + case UnregisterChildStateLogEvent unregisterEvent: + if (state.GroupChildrenCount.ContainsKey(unregisterEvent.GroupIndex)) + { + state.GroupChildrenCount[unregisterEvent.GroupIndex] -= 1; + } + + break; + } + + base.TransitionState(state, @event); + } + + [GenerateSerializer] + public class SetParentStateLogEvent : StateLogEventBase + { + [Id(0)] public GrainId ParentGrainId { get; set; } + } + + [GenerateSerializer] + public class UpdateGroupChildrenCountStateLogEvent : StateLogEventBase + { + [Id(0)] public int GroupIndex { get; set; } + [Id(1)] public int ChildrenCount { get; set; } + } + + [GenerateSerializer] + public class RegisterChildStateLogEvent : StateLogEventBase + { + [Id(0)] public GrainId ChildGroupGrainId { get; set; } + [Id(1)] public int GroupIndex { get; set; } + } + + [GenerateSerializer] + public class UnregisterChildStateLogEvent : StateLogEventBase + { + [Id(0)] public GrainId ChildGrainId { get; set; } + [Id(1)] public int GroupIndex { get; set; } + } +} \ No newline at end of file diff --git a/src/Aevatar.Core/Extensions/StreamExtensions.cs b/src/Aevatar.Core/Extensions/StreamExtensions.cs new file mode 100644 index 00000000..648ad7e6 --- /dev/null +++ b/src/Aevatar.Core/Extensions/StreamExtensions.cs @@ -0,0 +1,16 @@ +using Aevatar.Core.Abstractions; +using Orleans.Streams; + +namespace Aevatar.Core.Extensions; + +public static class StreamExtensions +{ + public static IAsyncStream GetEventWrapperBaseStream(this IStreamProvider streamProvider, + string streamKey) + => streamProvider.GetStream(StreamId.Create(AevatarCoreStreamConfig.Prefix, streamKey)); + + public static IAsyncStream GetEventWrapperBaseStream(this IStreamProvider streamProvider, + GrainId grainId) + => streamProvider.GetStream(StreamId.Create(AevatarCoreStreamConfig.Prefix, + grainId.ToString())); +} \ No newline at end of file diff --git a/src/Aevatar.Core/GAgentBase.Publish.cs b/src/Aevatar.Core/GAgentBase.Publish.cs index 7efc36ca..f0f27f6c 100644 --- a/src/Aevatar.Core/GAgentBase.Publish.cs +++ b/src/Aevatar.Core/GAgentBase.Publish.cs @@ -14,8 +14,8 @@ protected async Task PublishAsync(EventWrapper eventWrapper) where T : Eve { try { - await SendEventUpwardsAsync(eventWrapper); - await SendEventDownwardsAsync(eventWrapper); + await _coordinator!.UpwardsEventAsync(eventWrapper); + await _coordinator!.DownwardsEventAsync(eventWrapper); } catch (Exception ex) { @@ -35,20 +35,8 @@ protected async Task PublishAsync(T @event) where T : EventBase var eventId = Guid.NewGuid(); try { - if (State.Parent == null) - { - Logger.LogInformation( - "Event is the first time appeared to silo: {@Event}", @event); - // This event is the first time appeared to silo. - await SendEventToSelfAsync(new EventWrapper(@event, eventId, GrainId)); - } - else - { - Logger.LogInformation( - "{GrainId} is publishing event upwards: {EventJson}", - GrainId.ToString(), JsonConvert.SerializeObject(@event)); - await PublishEventUpwardsAsync(@event, eventId); - } + var eventWrapper = new EventWrapper(@event, eventId, GrainId); + await _coordinator!.PublishEventAsync(eventWrapper); } catch (Exception ex) { @@ -59,79 +47,4 @@ protected async Task PublishAsync(T @event) where T : EventBase return eventId; } - - private async Task PublishEventUpwardsAsync(T @event, Guid eventId) where T : EventBase - { - try - { - await SendEventUpwardsAsync(new EventWrapper(@event, eventId, GrainId)); - Logger.LogDebug("{GrainId} published {Event} to upwards", GrainId.ToString(), - JsonConvert.SerializeObject(@event)); - } - catch (Exception ex) - { - Logger.LogError("{GrainId} failed to publish event {EventJson} upwards", GrainId.ToString(), - JsonConvert.SerializeObject(@event)); - throw new EventPublishingException($"{GrainId.ToString()} failed to publish event upwards", ex); - } - } - - private async Task SendEventUpwardsAsync(EventWrapper eventWrapper) where T : EventBase - { - if (State.Parent == null) - { - return; - } - - try - { - var stream = GetEventBaseStreamForOwn(State.Parent.Value); - await stream.OnNextAsync(eventWrapper); - } - catch (Exception ex) - { - Logger.LogError("{GrainId} failed to send event {EventWrapper} upwards", GrainId.ToString(), - eventWrapper); - throw new EventPublishingException($"{GrainId.ToString()} failed to event event upwards", ex); - } - } - - private async Task SendEventToSelfAsync(EventWrapper eventWrapper) where T : EventBase - { - Logger.LogInformation( - $"{GrainId.ToString()} is sending event to self: {JsonConvert.SerializeObject(eventWrapper)}"); - try - { - var streamOfThisGAgent = GetEventBaseStreamForOwn(GrainId); - await streamOfThisGAgent.OnNextAsync(eventWrapper); - } - catch (Exception ex) - { - Logger.LogError("{GrainId} failed to send event {EventWrapper} to itself", GrainId.ToString(), - eventWrapper); - throw new EventPublishingException($"{GrainId.ToString()} failed to event event to itself", ex); - } - } - - private async Task SendEventDownwardsAsync(EventWrapper eventWrapper) where T : EventBase - { - if (State.Children.IsNullOrEmpty()) - { - return; - } - - Logger.LogInformation($"{GrainId.ToString()} has {State.Children.Count} children."); - - try - { - var streamForChildren = GetEventBaseStreamForChildren(GrainId); - await streamForChildren.OnNextAsync(eventWrapper); - } - catch (Exception ex) - { - Logger.LogError("{GrainId} failed to send event {EventWrapper} downwards", GrainId.ToString(), - eventWrapper); - throw new EventPublishingException($"{GrainId.ToString()} failed to event event downwards", ex); - } - } } \ No newline at end of file diff --git a/src/Aevatar.Core/GAgentBase.Subscribe.cs b/src/Aevatar.Core/GAgentBase.Subscribe.cs index 41347d66..4ea54919 100644 --- a/src/Aevatar.Core/GAgentBase.Subscribe.cs +++ b/src/Aevatar.Core/GAgentBase.Subscribe.cs @@ -10,30 +10,35 @@ protected sealed override void TransitionState(TState state, StateLogEventBase { [Id(0)] public GrainId Parent { get; set; } } - + private async Task ClearParentAsync(GrainId grainId) { Logger.LogDebug("GrainId [{GrainId}] Removing parent to {Parent}", this.GetGrainId().ToString(), grainId); diff --git a/src/Aevatar.Core/GAgentBase.cs b/src/Aevatar.Core/GAgentBase.cs index b5e1c034..bd7917c6 100644 --- a/src/Aevatar.Core/GAgentBase.cs +++ b/src/Aevatar.Core/GAgentBase.cs @@ -1,5 +1,7 @@ using Aevatar.Core.Abstractions; +using Aevatar.Core.Abstractions.EventPublish; using Aevatar.Core.Abstractions.Projections; +using Aevatar.Core.Extensions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -57,6 +59,8 @@ public abstract partial class private IStateDispatcher? StateDispatcher { get; set; } protected AevatarOptions? AevatarOptions { get; private set; } + private IStreamCoordinatorGrain? _coordinator; + public async Task ActivateAsync() { await Task.Yield(); @@ -70,18 +74,18 @@ public async Task RegisterAsync(IGAgent gAgent) return; } - await AddChildAsync(gAgent.GetGrainId()); - await gAgent.SubscribeToAsync(this); - await OnRegisterAgentAsync(gAgent.GetGrainId()); + var childStreamCoordinator = GrainFactory.GetGrain(gAgent.GetGrainId().ToString()); + if (await childStreamCoordinator.SetParentAsync(this.GetGrainId())) + { + await _coordinator!.RegisterChildAsync(gAgent.GetGrainId()); + await OnRegisterAgentAsync(gAgent.GetGrainId()); + } } public async Task SubscribeToAsync(IGAgent gAgent) { - var parentGrainId = gAgent.GetGrainId(); - var parentStream = GetEventBaseStreamForChildren(parentGrainId); - var asyncObserver = new GAgentAsyncObserver(_observers); - await ResumeOrSubscribeAsync(parentStream, asyncObserver); await SetParentAsync(gAgent.GetGrainId()); + await _coordinator!.SetParentAsync(gAgent.GetGrainId()); } public async Task UnsubscribeFromAsync(IGAgent gAgent) @@ -91,8 +95,7 @@ public async Task UnsubscribeFromAsync(IGAgent gAgent) public async Task UnregisterAsync(IGAgent gAgent) { - await RemoveChildAsync(gAgent.GetGrainId()); - await gAgent.UnsubscribeFromAsync(this); + await _coordinator!.UnregisterChildAsync(gAgent.GetGrainId()); await OnUnregisterAgentAsync(gAgent.GetGrainId()); } @@ -111,9 +114,9 @@ public async Task UnregisterAsync(IGAgent gAgent) return handlingTypes.ToList(); } - public Task> GetChildrenAsync() + public async Task> GetChildrenAsync() { - return Task.FromResult(State.Children); + return await _coordinator!.GetChildrenAsync(); } public Task GetParentAsync() @@ -161,6 +164,7 @@ public async Task HandleRequestAllSubscriptionsEventAs private async Task GetGroupSubscribedEventListEvent() { + // TODO: Refactor this cause Children now stored on EventPubChildrenGroupGrain. var gAgentList = State.Children .Distinct() .Select(grainId => GrainFactory.GetGrain(grainId)) @@ -213,7 +217,7 @@ protected virtual async Task ForwardEventAsync(EventWrapperBase eventWrapper) })) { Logger.LogDebug("Forwarding event to children: {Event}", JsonConvert.SerializeObject(typedWrapper)); - await SendEventDownwardsAsync(typedWrapper); + await _coordinator!.DownwardsEventAsync(eventWrapper); } } @@ -254,33 +258,23 @@ private async Task BaseOnActivateAsync(CancellationToken cancellationToken) // This must be called first to initialize Observers field. await UpdateObserverListAsync(GetType()); - var initTasks = new[] - { - InitializeOrResumeEventBaseStreamAsync(), - ResumeEventBaseStreamForChildrenAsync(), - ActivateProjectionGrainAsync() - }; - await Task.WhenAll(initTasks); - } + _coordinator = GrainFactory.GetGrain( + this.GetGrainId().ToString()); - private async Task InitializeOrResumeEventBaseStreamAsync() - { - var streamOfThisGAgent = GetEventBaseStreamForOwn(this.GetGrainId()); - var asyncObserver = new GAgentAsyncObserver(_observers); - await ResumeOrSubscribeAsync(streamOfThisGAgent, asyncObserver); + await InitializeOrResumeEventBaseStreamAsync(); + await ActivateProjectionGrainAsync(); } - private async Task ResumeEventBaseStreamForChildrenAsync() + private async Task InitializeOrResumeEventBaseStreamAsync() { - var streamForChildren = GetEventBaseStreamForChildren(this.GetGrainId()); - var tasks = new List(); - foreach (var childGrainId in State.Children) + if (_observers.Count == 0) { - var child = await GAgentFactory.GetGAgentAsync(childGrainId); - tasks.Add(child.ResumeSubscriptionAsync(streamForChildren)); + return; } - await Task.WhenAll(tasks); + var streamOfThisGAgent = StreamProvider.GetEventWrapperBaseStream(GrainId); + var asyncObserver = new GAgentAsyncObserver(_observers); + await ResumeOrSubscribeAsync(streamOfThisGAgent, asyncObserver); } private async Task ActivateProjectionGrainAsync() @@ -362,22 +356,4 @@ protected virtual Task HandleRaiseEventAsync() // Derived classes can override this method. return Task.CompletedTask; } - - private IAsyncStream GetEventBaseStreamForOwn(GrainId grainId) - { - var grainIdString = grainId.ToString(); - return GetEventBaseStream(grainIdString); - } - - private IAsyncStream GetEventBaseStreamForChildren(GrainId grainId) - { - var grainIdString = $"{grainId.ToString()}/Children"; - return GetEventBaseStream(grainIdString); - } - - private IAsyncStream GetEventBaseStream(string streamKey) - { - var streamId = StreamId.Create(AevatarOptions!.StreamNamespace, streamKey); - return StreamProvider.GetStream(streamId); - } } \ No newline at end of file diff --git a/src/Aevatar/Extensions/OrleansHostExtensions.cs b/src/Aevatar/Extensions/OrleansHostExtensions.cs index 47cee679..cad1d816 100644 --- a/src/Aevatar/Extensions/OrleansHostExtensions.cs +++ b/src/Aevatar/Extensions/OrleansHostExtensions.cs @@ -1,3 +1,4 @@ +using Aevatar.Core.Abstractions; using Aevatar.Plugins.Extensions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -23,6 +24,9 @@ public static ISiloBuilder UseAevatar(this ISiloBuilder builder, bool includingA { services.Add(abpApplication.Services); } + + var prefix = services.GetConfiguration().GetSection("Aevatar").GetSection("StreamNamespace").Value; + AevatarCoreStreamConfig.Initialize(prefix); }); } diff --git a/test/Aevatar.Core.Tests/EventHandlingTests.cs b/test/Aevatar.Core.Tests/EventHandlingTests.cs index 14a10e31..0ee7e0b9 100644 --- a/test/Aevatar.Core.Tests/EventHandlingTests.cs +++ b/test/Aevatar.Core.Tests/EventHandlingTests.cs @@ -49,7 +49,7 @@ public async Task EventHandlerTest() var groupGAgent = await CreateGroupGAgentAsync(eventHandlerTestGAgent); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(eventHandlerTestGAgent, groupGAgent, publishingGAgent); + await AddProbesByGrainIdAsync(eventHandlerTestGAgent, groupGAgent, publishingGAgent); // Act of registering. await publishingGAgent.PublishEventAsync(new NaiveTestEvent @@ -87,7 +87,7 @@ public async Task EventWithResponseTest() var groupGAgent = await CreateGroupGAgentAsync(eventHandlerTestGAgent, eventHandlerWithResponseTestGAgent); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(eventHandlerTestGAgent, eventHandlerWithResponseTestGAgent, groupGAgent, publishingGAgent); + AddProbesByGrainIdAsync(eventHandlerTestGAgent, eventHandlerWithResponseTestGAgent, groupGAgent, publishingGAgent); // Act. await publishingGAgent.PublishEventAsync(new ResponseTestEvent @@ -169,7 +169,7 @@ public async Task RequestSubscribedEventListTest() subscribeTestGAgent); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(eventHandlerTestGAgent, eventHandlerWithResponseTestGAgent, subscribeTestGAgent, groupGAgent, + AddProbesByGrainIdAsync(eventHandlerTestGAgent, eventHandlerWithResponseTestGAgent, subscribeTestGAgent, groupGAgent, publishingGAgent); // Act. @@ -192,7 +192,7 @@ public async Task ExceptionHandlingTest() var groupGAgent = await CreateGroupGAgentAsync(eventHandlerTestGAgent, testGAgent); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(eventHandlerTestGAgent, testGAgent, groupGAgent, publishingGAgent); + await AddProbesByGrainIdAsync(eventHandlerTestGAgent, testGAgent, groupGAgent, publishingGAgent); // Act. await publishingGAgent.PublishEventAsync(new NaiveTestEvent diff --git a/test/Aevatar.Core.Tests/EventSourcingTests.cs b/test/Aevatar.Core.Tests/EventSourcingTests.cs index 481d4518..49f994a8 100644 --- a/test/Aevatar.Core.Tests/EventSourcingTests.cs +++ b/test/Aevatar.Core.Tests/EventSourcingTests.cs @@ -19,7 +19,7 @@ public async Task LogViewAdaptorTest() var groupGAgent = await CreateGroupGAgentAsync(logViewGAgent); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(publishingGAgent, groupGAgent, logViewGAgent); + AddProbesByGrainIdAsync(publishingGAgent, groupGAgent, logViewGAgent); // Act: First event. await publishingGAgent.PublishEventAsync(new NaiveTestEvent diff --git a/test/Aevatar.Core.Tests/GAgentTestKitBase.cs b/test/Aevatar.Core.Tests/GAgentTestKitBase.cs index 01be8421..fe2e49b4 100644 --- a/test/Aevatar.Core.Tests/GAgentTestKitBase.cs +++ b/test/Aevatar.Core.Tests/GAgentTestKitBase.cs @@ -30,7 +30,7 @@ protected async Task CreateGroupGAgentAsync(params IGAgent[] gAgent return groupGAgent; } - protected void AddProbesByGrainId(params IGAgent?[] gAgents) + protected async Task AddProbesByGrainIdAsync(params IGAgent?[] gAgents) { foreach (var gAgent in gAgents) { diff --git a/test/Aevatar.Core.Tests/GroupingTests.cs b/test/Aevatar.Core.Tests/GroupingTests.cs index bc3415a9..187ecb84 100644 --- a/test/Aevatar.Core.Tests/GroupingTests.cs +++ b/test/Aevatar.Core.Tests/GroupingTests.cs @@ -72,9 +72,9 @@ public async Task MultipleGroupRegisterOneGAgentTest() // Assert: Check each group's states from GrainStorage. foreach (var groupGAgent in new List { groupGAgent1, groupGAgent2, groupGAgent3 }) { - var subscribers = await groupGAgent.GetChildrenAsync(); - subscribers.Count.ShouldBe(1); - subscribers.First().ShouldBe(naiveTestGAgent.GetGrainId()); + var children = await groupGAgent.GetChildrenAsync(); + children.Count.ShouldBe(1); + children.First().ShouldBe(naiveTestGAgent.GetGrainId()); } } @@ -163,7 +163,8 @@ public async Task SubscriptionTest() var groupGAgent = await CreateGroupGAgentAsync(marketingLeader, developingLeader); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(publishingGAgent, groupGAgent, marketingLeader, developingLeader, developer1, developer2, developer3, investor1, investor2); + AddProbesByGrainIdAsync(publishingGAgent, groupGAgent, marketingLeader, developingLeader, developer1, + developer2, developer3, investor1, investor2); // Act. await publishingGAgent.PublishEventAsync(new NewDemandTestEvent @@ -202,7 +203,7 @@ public async Task RegisterSameGrainTest() var gAgent = await Silo.CreateGrainAsync(guid); await gAgent.RegisterAsync(gAgent); var groupGAgent = await CreateGroupGAgentAsync(gAgent, gAgent); - + var subscribers = await groupGAgent.GetChildrenAsync(); subscribers.Count.ShouldBe(1); } diff --git a/test/Aevatar.Core.Tests/PublishingTests.cs b/test/Aevatar.Core.Tests/PublishingTests.cs index 9abf0578..62f9c5f1 100644 --- a/test/Aevatar.Core.Tests/PublishingTests.cs +++ b/test/Aevatar.Core.Tests/PublishingTests.cs @@ -15,7 +15,7 @@ public async Task PublishToEventHandlerTest() var groupGAgent = await CreateGroupGAgentAsync(eventHandlerTestGAgent); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(publishingGAgent, groupGAgent, eventHandlerTestGAgent); + AddProbesByGrainIdAsync(publishingGAgent, groupGAgent, eventHandlerTestGAgent); // Act. await publishingGAgent.PublishEventAsync(new NaiveTestEvent @@ -46,7 +46,7 @@ public async Task MultiLevelDownwardsTest() var level1 = await CreateGroupGAgentAsync(level2A, level2B); var publishingGAgent = await CreatePublishingGAgentAsync(level1); - AddProbesByGrainId(publishingGAgent, level1, level2A, level2B, level3A, level3B); + AddProbesByGrainIdAsync(publishingGAgent, level1, level2A, level2B, level3A, level3B); // Act. await publishingGAgent.PublishEventAsync(new NaiveTestEvent @@ -78,7 +78,7 @@ public async Task MultiLevelUpwardsTest() var level1 = await CreateGroupGAgentAsync(level2A, level2B); var publishingGAgent = await CreatePublishingGAgentAsync(level1); - AddProbesByGrainId(publishingGAgent, level1, level2A, level2B, level3A, level3B); + AddProbesByGrainIdAsync(publishingGAgent, level1, level2A, level2B, level3A, level3B); // Act: ResponseTestEvent will cause level32 publish an NaiveTestEvent. await publishingGAgent.PublishEventAsync(new ResponseTestEvent @@ -113,7 +113,7 @@ public async Task RegisterSameGuidTest() await level1.RegisterAsync(level2B); var publishingGAgent = await CreatePublishingGAgentAsync(level1); - AddProbesByGrainId(publishingGAgent, level1, level2A, level2B, level3A, level3B); + AddProbesByGrainIdAsync(publishingGAgent, level1, level2A, level2B, level3A, level3B); // Act: ResponseTestEvent will cause level32 publish an NaiveTestEvent. await publishingGAgent.PublishEventAsync(new ResponseTestEvent diff --git a/test/Aevatar.Core.Tests/TestGAgents/DeveloperTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/DeveloperTestGAgent.cs index 2da6ac2e..25c16db9 100644 --- a/test/Aevatar.Core.Tests/TestGAgents/DeveloperTestGAgent.cs +++ b/test/Aevatar.Core.Tests/TestGAgents/DeveloperTestGAgent.cs @@ -19,14 +19,7 @@ public override Task GetDescriptionAsync() public async Task HandleEventAsync(DevelopTaskTestEvent eventData) { - if (State.Content.IsNullOrEmpty()) - { - State.Content = []; - } - State.Content.Add(eventData.Description); - - Logger.LogInformation("TEST"); return new NewFeatureCompletedTestEvent { PullRequestUrl = $"PR for {eventData.Description}" diff --git a/test/Aevatar.Core.Tests/TestGAgents/DevelopingLeaderTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/DevelopingLeaderTestGAgent.cs index e3b5d0ea..c44b2c1d 100644 --- a/test/Aevatar.Core.Tests/TestGAgents/DevelopingLeaderTestGAgent.cs +++ b/test/Aevatar.Core.Tests/TestGAgents/DevelopingLeaderTestGAgent.cs @@ -27,11 +27,6 @@ await PublishAsync(new DevelopTaskTestEvent public async Task HandleEventAsync(NewFeatureCompletedTestEvent eventData) { - if (State.Content.IsNullOrEmpty()) - { - State.Content = []; - } - State.Content.Add(eventData.PullRequestUrl); if (State.Content.Count == 3) diff --git a/test/Aevatar.Core.Tests/TestGAgents/InvestorTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/InvestorTestGAgent.cs index b94ee457..653b6e64 100644 --- a/test/Aevatar.Core.Tests/TestGAgents/InvestorTestGAgent.cs +++ b/test/Aevatar.Core.Tests/TestGAgents/InvestorTestGAgent.cs @@ -4,7 +4,7 @@ namespace Aevatar.Core.Tests.TestGAgents; -public interface IInvestorTestGAgent: IGAgent +public interface IInvestorTestGAgent: IStateGAgent { } @@ -25,11 +25,6 @@ public override Task GetDescriptionAsync() public async Task HandleEventAsync(WorkingOnTestEvent eventData) { - if (State.Content.IsNullOrEmpty()) - { - State.Content = []; - } - State.Content.Add(eventData.Description); await PublishAsync(new InvestorFeedbackTestEvent diff --git a/test/Aevatar.Core.Tests/TestGAgents/MarketingLeaderTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/MarketingLeaderTestGAgent.cs index 28542e21..09513a9c 100644 --- a/test/Aevatar.Core.Tests/TestGAgents/MarketingLeaderTestGAgent.cs +++ b/test/Aevatar.Core.Tests/TestGAgents/MarketingLeaderTestGAgent.cs @@ -1,6 +1,5 @@ using Aevatar.Core.Abstractions; using Aevatar.Core.Tests.TestEvents; -using Microsoft.Extensions.Logging; namespace Aevatar.Core.Tests.TestGAgents; @@ -9,7 +8,8 @@ public interface IMarketingLeaderTestGAgent : IGAgent; public class MarketingLeaderTestGAgentState : NaiveTestGAgentState; [GAgent("marketingLeader", "test")] -public class MarketingLeaderTestGAgent : GAgentBase, IMarketingLeaderTestGAgent +public class MarketingLeaderTestGAgent : GAgentBase, + IMarketingLeaderTestGAgent { public override Task GetDescriptionAsync() { @@ -31,14 +31,9 @@ await PublishAsync(new WorkingOnTestEvent Description = $"Working completed: {eventData.PullRequestUrl}" }); } - + public async Task HandleEventAsync(InvestorFeedbackTestEvent eventData) { - if (State.Content.IsNullOrEmpty()) - { - State.Content = []; - } - State.Content.Add($"Feedback from investor: {eventData.Content}"); } } \ No newline at end of file diff --git a/test/Aevatar.Core.Tests/TestGAgents/NaiveTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/NaiveTestGAgent.cs index 9ded83e5..8b04389d 100644 --- a/test/Aevatar.Core.Tests/TestGAgents/NaiveTestGAgent.cs +++ b/test/Aevatar.Core.Tests/TestGAgents/NaiveTestGAgent.cs @@ -7,7 +7,7 @@ namespace Aevatar.Core.Tests.TestGAgents; [GenerateSerializer] public class NaiveTestGAgentState : StateBase { - [Id(0)] public List Content { get; set; } + [Id(0)] public List Content { get; set; } = []; } public class NaiveTestStateLogEvent : StateLogEventBase diff --git a/test/Aevatar.GAgents.Tests/GAgentBaseTests.cs b/test/Aevatar.GAgents.Tests/GAgentBaseTests.cs index dc97cbbd..c52a7bbe 100644 --- a/test/Aevatar.GAgents.Tests/GAgentBaseTests.cs +++ b/test/Aevatar.GAgents.Tests/GAgentBaseTests.cs @@ -13,7 +13,7 @@ public GAgentBaseTests() { _grainFactory = GetRequiredService(); } - + [Fact(DisplayName = "Can use ConfigAsync method to config GAgent.")] public async Task ConfigurationTest() { @@ -47,12 +47,17 @@ public async Task ComplicatedEventHandleTest() var developer1 = _grainFactory.GetGrain(guid); var developer2 = _grainFactory.GetGrain(Guid.NewGuid()); var developer3 = _grainFactory.GetGrain(Guid.NewGuid()); + await developer1.ActivateAsync(); + await developer2.ActivateAsync(); + await developer3.ActivateAsync(); await developingLeader.RegisterAsync(developer1); await developingLeader.RegisterAsync(developer2); await developingLeader.RegisterAsync(developer3); var investor1 = _grainFactory.GetGrain>(guid); var investor2 = _grainFactory.GetGrain>(Guid.NewGuid()); + await investor1.ActivateAsync(); + await investor2.ActivateAsync(); await marketingLeader.RegisterAsync(investor1); await marketingLeader.RegisterAsync(investor2); @@ -69,14 +74,13 @@ await publishingGAgent.PublishEventAsync(new NewDemandTestEvent }); await TestHelper.WaitUntilAsync(_ => CheckState(investor1), TimeSpan.FromSeconds(20)); - var groupState = await groupGAgent.GetStateAsync(); groupState.RegisteredGAgents.ShouldBe(2); var investorState = await investor1.GetStateAsync(); investorState.Content.Count.ShouldBe(2); } - + private async Task CheckState(IStateGAgent investor1) { var state = await investor1.GetStateAsync(); diff --git a/test/Aevatar.GAgents.Tests/GAgentEventPubTests.cs b/test/Aevatar.GAgents.Tests/GAgentEventPubTests.cs new file mode 100644 index 00000000..82e47ab2 --- /dev/null +++ b/test/Aevatar.GAgents.Tests/GAgentEventPubTests.cs @@ -0,0 +1,55 @@ +using Aevatar.Core.Abstractions; +using Aevatar.Core.EventPublish; +using Aevatar.Core.Tests.TestEvents; +using Aevatar.Core.Tests.TestGAgents; +using Shouldly; +using Xunit.Abstractions; + +namespace Aevatar.GAgents.Tests; + +public sealed class GAgentEventPubTests : AevatarGAgentsTestBase +{ + private readonly ITestOutputHelper _outputHelper; + private readonly IGAgentFactory _gAgentFactory; + + public GAgentEventPubTests(ITestOutputHelper outputHelper) + { + _outputHelper = outputHelper; + _gAgentFactory = GetRequiredService(); + } + + [Fact] + public async Task MultipleLevelTest() + { + // Arrange. + var marketingLeader = await _gAgentFactory.GetGAgentAsync(); + + var investor1 = await _gAgentFactory.GetGAgentAsync(); + var investor2 = await _gAgentFactory.GetGAgentAsync(); + await marketingLeader.RegisterAsync(investor1); + //await marketingLeader.RegisterAsync(investor2); + + var groupGAgent = await _gAgentFactory.GetGAgentAsync>(); + await groupGAgent.RegisterAsync(marketingLeader); + var publishingGAgent = await _gAgentFactory.GetGAgentAsync(); + await publishingGAgent.RegisterAsync(groupGAgent); + + // Act. + await publishingGAgent.PublishEventAsync(new NewDemandTestEvent + { + Description = "New demand from customer." + }); + + await TestHelper.WaitUntilAsync(_ => CheckState(investor1), TimeSpan.FromSeconds(20)); + + EventPubGrain.Called.Count.ShouldBePositive(); + var investorState = await investor1.GetStateAsync(); + investorState.Content.Count.ShouldBe(1); + } + + private async Task CheckState(IStateGAgent investor1) + { + var state = await investor1.GetStateAsync(); + return !state.Content.IsNullOrEmpty() && state.Content.Count == 1; + } +} \ No newline at end of file diff --git a/test/OrleansTestKit/TestGrainFactory.cs b/test/OrleansTestKit/TestGrainFactory.cs index d83c0b5f..389a5199 100644 --- a/test/OrleansTestKit/TestGrainFactory.cs +++ b/test/OrleansTestKit/TestGrainFactory.cs @@ -1,4 +1,5 @@ -using Moq; +using Aevatar.Core.Abstractions.EventPublish; +using Moq; using Orleans.Runtime; namespace Orleans.TestKit; @@ -102,10 +103,13 @@ internal void AddProbe(Func> factory) AddProbe(adaptedFactory); } - private static string GetKey(IdSpan identity, Type stateType, string? classPrefix = null) => - classPrefix == null - ? $"{stateType.FullName}-{identity}" - : $"{stateType.FullName}-{classPrefix}-{identity}"; + private static string GetKey(IdSpan identity, Type stateType, string? classPrefix = null) + { + var prefix = stateType.Name!.ToLower()[1..^5]; + return classPrefix == null + ? $"{prefix}/{identity}" + : $"{prefix}/{classPrefix}/{identity}"; + } private T GetProbe(IdSpan identity, string? grainClassNamePrefix) where T : IGrain @@ -117,6 +121,7 @@ private T GetProbe(IdSpan identity, string? grainClassNamePrefix) private IGrain GetProbe(Type grainType, IdSpan identity, string? grainClassNamePrefix) { var key = GetKey(identity, grainType, grainClassNamePrefix); + if (_probes.TryGetValue(key, out var grain)) { return grain; diff --git a/test/OrleansTestKit/TestKitSilo.cs b/test/OrleansTestKit/TestKitSilo.cs index 4bbabbde..25f59c9a 100644 --- a/test/OrleansTestKit/TestKitSilo.cs +++ b/test/OrleansTestKit/TestKitSilo.cs @@ -2,6 +2,8 @@ using Aevatar; using Aevatar.Core; using Aevatar.Core.Abstractions; +using Aevatar.Core.Abstractions.EventPublish; +using Aevatar.Core.EventPublish; using Aevatar.EventSourcing.Core; using Aevatar.EventSourcing.Core.Hosting; using Aevatar.EventSourcing.Core.LogConsistency; @@ -300,6 +302,16 @@ public async Task CreateGrainAsync(IdSpan identity, CancellationToken canc await GetReminderActivationContext(grain, cancellation).ConfigureAwait(false); } + if (typeof(IGAgent).IsAssignableFrom(typeof(T))) + { + var streamCoordinatorGrain = await CreateGrainAsync(grainId.ToString()); + GrainFactory.AddProbe(streamCoordinatorGrain.GetGrainId(), streamCoordinatorGrain); + var childrenGroupGrain = await CreateGrainAsync(0, grainId.ToString()); + GrainFactory.AddProbe(childrenGroupGrain.GetGrainId(), childrenGroupGrain); + var eventPubGrain = await CreateGrainAsync(grainId.ToString()); + GrainFactory.AddProbe(eventPubGrain.GetGrainId(), eventPubGrain); + } + await grain.OnActivateAsync(cancellation).ConfigureAwait(false); _activatedGrains.Add(grain); From 7cd23b359ed177eeceec20371a85a74a98b0109e Mon Sep 17 00:00:00 2001 From: eanzhao Date: Mon, 7 Apr 2025 15:48:07 +0800 Subject: [PATCH 5/6] fix test cases. --- .../EventPublish/EventPubGrain.cs | 4 +-- .../EventPublish/StreamCoordinatorGrain.cs | 7 ++-- .../Extensions/EventWrapperBaseExtensions.cs | 36 +++++++++++++++++++ src/Aevatar.Core/GAgentBase.Observers.cs | 9 ++--- src/Aevatar.Core/GAgentBase.cs | 3 +- test/Aevatar.Core.Tests/EventHandlingTests.cs | 12 +++---- test/Aevatar.Core.Tests/EventSourcingTests.cs | 7 ++-- 7 files changed, 56 insertions(+), 22 deletions(-) create mode 100644 src/Aevatar.Core/Extensions/EventWrapperBaseExtensions.cs diff --git a/src/Aevatar.Core/EventPublish/EventPubGrain.cs b/src/Aevatar.Core/EventPublish/EventPubGrain.cs index f9adf8fa..f3c78277 100644 --- a/src/Aevatar.Core/EventPublish/EventPubGrain.cs +++ b/src/Aevatar.Core/EventPublish/EventPubGrain.cs @@ -18,8 +18,8 @@ public async Task PublishEventAsync(EventWrapperBase eventWrapper) { var stream = _streamProvider.GetEventWrapperBaseStream(this.GetPrimaryKeyString()); await stream.OnNextAsync(eventWrapper); - Called.Add(this.GetPrimaryKeyString()); + Called.Add(Called.Count + this.GetPrimaryKeyString(), eventWrapper); } - public static List Called = []; + public static Dictionary Called = []; } \ No newline at end of file diff --git a/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs b/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs index 647b8d29..46b34bed 100644 --- a/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs +++ b/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs @@ -132,8 +132,11 @@ public async Task DownwardsEventAsync(EventWrapperBase eventWrapper) public async Task UpwardsEventAsync(EventWrapperBase eventWrapper) { // Try self handling - var selfEventPubGrain = GrainFactory.GetGrain($"{this.GetPrimaryKeyString()}"); - await selfEventPubGrain.PublishEventAsync(eventWrapper); + if (eventWrapper.GetPublisherGrainId().ToString() != this.GetPrimaryKeyString()) + { + var selfEventPubGrain = GrainFactory.GetGrain($"{this.GetPrimaryKeyString()}"); + await selfEventPubGrain.PublishEventAsync(eventWrapper); + } // Parent handling if (State.Parent != default) diff --git a/src/Aevatar.Core/Extensions/EventWrapperBaseExtensions.cs b/src/Aevatar.Core/Extensions/EventWrapperBaseExtensions.cs new file mode 100644 index 00000000..6cb5de5d --- /dev/null +++ b/src/Aevatar.Core/Extensions/EventWrapperBaseExtensions.cs @@ -0,0 +1,36 @@ +using Aevatar.Core.Abstractions; + +namespace Aevatar.Core.Extensions; + +public static class EventWrapperBaseExtensions +{ + public static GrainId GetGrainId(this EventWrapperBase eventWrapper) + { + return (GrainId)eventWrapper.GetType().GetProperty(nameof(EventWrapper.GrainId)) + ?.GetValue(eventWrapper)!; + } + + public static GrainId GetPublisherGrainId(this EventWrapperBase eventWrapper) + { + return (GrainId)eventWrapper.GetType().GetProperty(nameof(EventWrapper.PublisherGrainId)) + ?.GetValue(eventWrapper)!; + } + + public static Guid? GetCorrelationId(this EventWrapperBase eventWrapper) + { + return (Guid?)eventWrapper.GetType().GetProperty(nameof(EventWrapper.CorrelationId)) + ?.GetValue(eventWrapper)!; + } + + public static EventBase GetEvent(this EventWrapperBase eventWrapper) + { + return (EventBase)eventWrapper.GetType().GetProperty(nameof(EventWrapper.Event)) + ?.GetValue(eventWrapper)!; + } + + public static Guid GetEventId(this EventWrapperBase eventWrapper) + { + return (Guid)eventWrapper.GetType().GetProperty(nameof(EventWrapper.EventId)) + ?.GetValue(eventWrapper)!; + } +} \ No newline at end of file diff --git a/src/Aevatar.Core/GAgentBase.Observers.cs b/src/Aevatar.Core/GAgentBase.Observers.cs index 035edbe7..cb15d2f6 100644 --- a/src/Aevatar.Core/GAgentBase.Observers.cs +++ b/src/Aevatar.Core/GAgentBase.Observers.cs @@ -97,12 +97,9 @@ private EventWrapperBaseAsyncObserver CreateMethodObserver( { try { - var eventId = (Guid)item.GetType().GetProperty(nameof(EventWrapper.EventId)) - ?.GetValue(item)!; - var eventType = (TEvent)item.GetType().GetProperty(nameof(EventWrapper.Event)) - ?.GetValue(item)!; - var grainId = (GrainId)item.GetType().GetProperty(nameof(EventWrapper.GrainId)) - ?.GetValue(item)!; + var eventId = item.GetEventId(); + var eventType = (TEvent)item.GetEvent(); + var grainId = item.GetGrainId(); var eventWrapper = new EventWrapper(eventType, eventId, grainId); diff --git a/src/Aevatar.Core/GAgentBase.cs b/src/Aevatar.Core/GAgentBase.cs index bd7917c6..85a441b6 100644 --- a/src/Aevatar.Core/GAgentBase.cs +++ b/src/Aevatar.Core/GAgentBase.cs @@ -165,7 +165,8 @@ public async Task HandleRequestAllSubscriptionsEventAs private async Task GetGroupSubscribedEventListEvent() { // TODO: Refactor this cause Children now stored on EventPubChildrenGroupGrain. - var gAgentList = State.Children + var children = await _coordinator!.GetChildrenAsync(); + var gAgentList = children .Distinct() .Select(grainId => GrainFactory.GetGrain(grainId)) .ToList(); diff --git a/test/Aevatar.Core.Tests/EventHandlingTests.cs b/test/Aevatar.Core.Tests/EventHandlingTests.cs index 0ee7e0b9..7dac028e 100644 --- a/test/Aevatar.Core.Tests/EventHandlingTests.cs +++ b/test/Aevatar.Core.Tests/EventHandlingTests.cs @@ -23,7 +23,7 @@ public async Task EventHandlerRecognizeTest() subscribedEventList.ShouldNotBeNull(); subscribedEventList.Count.ShouldBe(4); subscribedEventList.ShouldContain(typeof(NaiveTestEvent)); - subscribedEventList.Count(e => e == typeof(NaiveTestEvent)).ShouldBe(2); + subscribedEventList.Count(e => e == typeof(NaiveTestEvent)).ShouldBe(3); subscribedEventList.ShouldContain(typeof(EventWrapperBase)); } @@ -33,9 +33,9 @@ public async Task EventHandlerRecognizeTest() // Assert. subscribedEventList.ShouldNotBeNull(); - subscribedEventList.Count.ShouldBe(4); + subscribedEventList.Count.ShouldBe(5); subscribedEventList.ShouldContain(typeof(NaiveTestEvent)); - subscribedEventList.Count(e => e == typeof(NaiveTestEvent)).ShouldBe(2); + subscribedEventList.Count(e => e == typeof(NaiveTestEvent)).ShouldBe(3); subscribedEventList.ShouldContain(typeof(EventWrapperBase)); subscribedEventList.ShouldContain(typeof(RequestAllSubscriptionsEvent)); } @@ -169,7 +169,7 @@ public async Task RequestSubscribedEventListTest() subscribeTestGAgent); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainIdAsync(eventHandlerTestGAgent, eventHandlerWithResponseTestGAgent, subscribeTestGAgent, groupGAgent, + await AddProbesByGrainIdAsync(eventHandlerTestGAgent, eventHandlerWithResponseTestGAgent, subscribeTestGAgent, groupGAgent, publishingGAgent); // Act. @@ -177,8 +177,8 @@ public async Task RequestSubscribedEventListTest() // Assert. var state = await subscribeTestGAgent.GetStateAsync(); - state.SubscriptionInfo.Count.ShouldBe(4); - state.SubscriptionInfo[typeof(EventHandlerTestGAgent)].Count.ShouldBe(3); + state.SubscriptionInfo.Count.ShouldBe(3); + state.SubscriptionInfo[typeof(EventHandlerTestGAgent)].Count.ShouldBe(4); state.SubscriptionInfo[typeof(EventHandlerWithResponseTestGAgent)].Count.ShouldBe(1); state.SubscriptionInfo[typeof(SubscribeTestGAgent)].Count.ShouldBe(1); } diff --git a/test/Aevatar.Core.Tests/EventSourcingTests.cs b/test/Aevatar.Core.Tests/EventSourcingTests.cs index 49f994a8..aa3d7d1d 100644 --- a/test/Aevatar.Core.Tests/EventSourcingTests.cs +++ b/test/Aevatar.Core.Tests/EventSourcingTests.cs @@ -56,15 +56,12 @@ await publishingGAgent.PublishEventAsync(new NaiveTestEvent (await logViewGAgent.GetStateAsync()).Content.Count.ShouldBe(3); } - const int minimum = 1; // SetParent or AddChildren event. // Asset: Check the log storage. InMemoryLogConsistentStorage.Storage.Count.ShouldBeGreaterThanOrEqualTo(3); InMemoryLogConsistentStorage.Storage.ShouldContainKey(GetStreamName(logViewGAgent.GetGrainId())); - InMemoryLogConsistentStorage.Storage[GetStreamName(logViewGAgent.GetGrainId())].Count.ShouldBe(minimum + 3); + InMemoryLogConsistentStorage.Storage[GetStreamName(logViewGAgent.GetGrainId())].Count.ShouldBe(3); InMemoryLogConsistentStorage.Storage.ShouldContainKey(GetStreamName(groupGAgent.GetGrainId())); - InMemoryLogConsistentStorage.Storage[GetStreamName(groupGAgent.GetGrainId())].Count.ShouldBe(minimum + 2); - InMemoryLogConsistentStorage.Storage.ShouldContainKey(GetStreamName(publishingGAgent.GetGrainId())); - InMemoryLogConsistentStorage.Storage[GetStreamName(publishingGAgent.GetGrainId())].Count.ShouldBe(minimum); + InMemoryLogConsistentStorage.Storage[GetStreamName(groupGAgent.GetGrainId())].Count.ShouldBe(1); } private async Task CheckCount(LogViewAdaptorTestGAgent gAgent, int expectedCount) From ecb43abbc6eff970b055f8e846fc243a683dd0cf Mon Sep 17 00:00:00 2001 From: eanzhao Date: Mon, 7 Apr 2025 21:50:58 +0800 Subject: [PATCH 6/6] feat: add test cases for register & unregister. --- .../EventPublish/IStreamCoordinatorGrain.cs | 7 ++- .../EventPublish/EventPubGrain.cs | 3 -- .../EventPublish/StreamCoordinatorGrain.cs | 12 ++--- src/Aevatar.Core/GAgentBase.cs | 40 +++++++++----- .../GAgentEventPubTests.cs | 54 +++++++++++++++++-- 5 files changed, 89 insertions(+), 27 deletions(-) diff --git a/src/Aevatar.Core.Abstractions/EventPublish/IStreamCoordinatorGrain.cs b/src/Aevatar.Core.Abstractions/EventPublish/IStreamCoordinatorGrain.cs index ef965a4c..8bfc0e1b 100644 --- a/src/Aevatar.Core.Abstractions/EventPublish/IStreamCoordinatorGrain.cs +++ b/src/Aevatar.Core.Abstractions/EventPublish/IStreamCoordinatorGrain.cs @@ -1,3 +1,5 @@ +using Orleans.Concurrency; + namespace Aevatar.Core.Abstractions.EventPublish; public interface IStreamCoordinatorGrain : IGrainWithStringKey @@ -5,11 +7,14 @@ public interface IStreamCoordinatorGrain : IGrainWithStringKey Task SetParentAsync(GrainId parentGrainId); Task ClearParentAsync(GrainId parentGrainId); Task SetGroupIndexAsync(int groupIndex); + [ReadOnly] Task GetGroupIndexAsync(); + [ReadOnly] Task GetParentAsync(); Task RegisterChildAsync(GrainId childGrainId); - Task RegisterManyChildAsync(List childrenGrainIds); + Task RegisterManyChildAsync(List childrenGrainIds); Task UnregisterChildAsync(GrainId childGrainId); + [ReadOnly] Task> GetChildrenAsync(); Task PublishEventAsync(EventWrapperBase eventWrapper); Task DownwardsEventAsync(EventWrapperBase eventWrapper); diff --git a/src/Aevatar.Core/EventPublish/EventPubGrain.cs b/src/Aevatar.Core/EventPublish/EventPubGrain.cs index f3c78277..3d9399e9 100644 --- a/src/Aevatar.Core/EventPublish/EventPubGrain.cs +++ b/src/Aevatar.Core/EventPublish/EventPubGrain.cs @@ -18,8 +18,5 @@ public async Task PublishEventAsync(EventWrapperBase eventWrapper) { var stream = _streamProvider.GetEventWrapperBaseStream(this.GetPrimaryKeyString()); await stream.OnNextAsync(eventWrapper); - Called.Add(Called.Count + this.GetPrimaryKeyString(), eventWrapper); } - - public static Dictionary Called = []; } \ No newline at end of file diff --git a/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs b/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs index 6d93deeb..a45c0a7c 100644 --- a/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs +++ b/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs @@ -34,13 +34,11 @@ public class StreamCoordinatorGrain : { private readonly ILogger _logger; private readonly IStreamProvider _streamProvider; - private readonly AevatarOptions _aevatarOptions; public StreamCoordinatorGrain(ILogger logger) { _logger = logger; _streamProvider = this.GetStreamProvider(AevatarCoreConstants.StreamProvider); - _aevatarOptions = ServiceProvider.GetRequiredService>().Value; } public async Task SetParentAsync(GrainId parentGrainId) @@ -56,7 +54,7 @@ public async Task SetParentAsync(GrainId parentGrainId) public async Task ClearParentAsync(GrainId parentGrainId) { - base.RaiseEvent(new ClearParentStateLogEvent + RaiseEvent(new ClearParentStateLogEvent { Parent = parentGrainId }); @@ -65,7 +63,7 @@ public async Task ClearParentAsync(GrainId parentGrainId) public async Task SetGroupIndexAsync(int groupIndex) { - base.RaiseEvent(new SetGroupIndexStateLogEvent + RaiseEvent(new SetGroupIndexStateLogEvent { GroupIndex = groupIndex }); @@ -94,7 +92,7 @@ public async Task RegisterChildAsync(GrainId childGrainId) return groupIndex; } - public async Task RegisterManyChildAsync(List childrenGrainIds) + public async Task RegisterManyChildAsync(List childrenGrainIds) { var count = childrenGrainIds.Count; var groupIndex = State.GroupChildrenCount @@ -104,11 +102,13 @@ public async Task RegisterManyChildAsync(List childrenGrainIds) await childGroupGrain.AddManyChildAsync(childrenGrainIds); RaiseEvent(new UpdateGroupChildrenCountStateLogEvent { GroupIndex = groupIndex, ChildrenCount = childrenCount + count }); await ConfirmEvents(); + return groupIndex; } public async Task UnregisterChildAsync(GrainId childGrainId) { var childStreamCoordinator = GrainFactory.GetGrain(childGrainId.ToString()); + await childStreamCoordinator.ClearParentAsync(this.GetGrainId()); var groupIndex = await childStreamCoordinator.GetGroupIndexAsync(); var childGroupGrain = GrainFactory.GetGrain(groupIndex, this.GetPrimaryKeyString()); @@ -199,7 +199,7 @@ protected override void TransitionState(StreamCoordinatorState state, State.ParentGrainId = setParentEvent.ParentGrainId; break; case ClearParentStateLogEvent clearParentStateLogEvent: - if (State.ParentGrainId == clearParentStateLogEvent.Parent) + if (clearParentStateLogEvent.Parent.ToString().Contains(State.ParentGrainId.ToString())) State.ParentGrainId = default; break; case SetGroupIndexStateLogEvent setGroupIndexStateLogEvent: diff --git a/src/Aevatar.Core/GAgentBase.cs b/src/Aevatar.Core/GAgentBase.cs index f6a408e2..bd0d5d7a 100644 --- a/src/Aevatar.Core/GAgentBase.cs +++ b/src/Aevatar.Core/GAgentBase.cs @@ -68,18 +68,21 @@ public async Task ActivateAsync() public async Task RegisterAsync(IGAgent gAgent) { - if (gAgent.GetGrainId() == this.GetGrainId()) + var grainId = gAgent.GetGrainId(); + if (grainId == this.GetGrainId()) { Logger.LogError($"Cannot register GAgent with same GrainId."); return; } - var childStreamCoordinator = GrainFactory.GetGrain(gAgent.GetGrainId().ToString()); + Logger.LogDebug("GrainId [{GrainId}] register child {Parent}", GrainId.ToString(), grainId.ToString()); + + var childStreamCoordinator = GrainFactory.GetGrain(grainId.ToString()); if (await childStreamCoordinator.SetParentAsync(GrainId)) { - var groupIndex = await _coordinator!.RegisterChildAsync(gAgent.GetGrainId()); + var groupIndex = await _coordinator!.RegisterChildAsync(grainId); await childStreamCoordinator.SetGroupIndexAsync(groupIndex); - await OnRegisterAgentAsync(gAgent.GetGrainId()); + await OnRegisterAgentAsync(grainId); } } @@ -98,36 +101,45 @@ public async Task RegisterManyAsync(List gAgents) var grainIds = gAgents.Select(g => g.GetGrainId()).ToList(); var successGrainIds = new List(); - var tasks = new List(); + var streamCoordinators = new List(); foreach (var gAgent in gAgents) { - var childGrainId = gAgent.GetGrainId(); - var childStreamCoordinator = GrainFactory.GetGrain(childGrainId.ToString()); + var childStreamCoordinator = GrainFactory.GetGrain(gAgent.GetGrainId().ToString()); + streamCoordinators.Add(childStreamCoordinator); if (await childStreamCoordinator.SetParentAsync(GrainId)) { - successGrainIds.Add(childGrainId); + successGrainIds.Add(gAgent.GetGrainId()); } } - tasks.Add(_coordinator!.RegisterManyChildAsync(successGrainIds)); - tasks.Add(OnRegisterAgentManyAsync(grainIds)); - await Task.WhenAll(tasks); + // TODO: Optimize. + var groupIndex = await _coordinator!.RegisterManyChildAsync(successGrainIds); + foreach (var coordinator in streamCoordinators) + { + await coordinator.SetGroupIndexAsync(groupIndex); + } + + await OnRegisterAgentManyAsync(grainIds); } public async Task SubscribeToAsync(IGAgent gAgent) { - await _coordinator!.SetParentAsync(gAgent.GetGrainId()); + var grainId = gAgent.GetGrainId(); + Logger.LogDebug("GrainId [{GrainId}] subscribe to {Parent}", GrainId.ToString(), grainId.ToString()); + await _coordinator!.SetParentAsync(grainId); } public async Task UnsubscribeFromAsync(IGAgent gAgent) { var grainId = gAgent.GetGrainId(); - Logger.LogDebug("GrainId [{GrainId}] Removing parent {Parent}", this.GetGrainId().ToString(), grainId); + Logger.LogDebug("GrainId [{GrainId}] unsubscribe from {Parent}", GrainId.ToString(), grainId.ToString()); await _coordinator!.ClearParentAsync(grainId); } public async Task UnregisterAsync(IGAgent gAgent) { + var grainId = gAgent.GetGrainId(); + Logger.LogDebug("GrainId [{GrainId}] unregister child {Child}", GrainId.ToString(), grainId.ToString()); await _coordinator!.UnregisterChildAsync(gAgent.GetGrainId()); await OnUnregisterAgentAsync(gAgent.GetGrainId()); } @@ -201,6 +213,8 @@ private async Task GetGroupSubscribedEventListEvent() var gAgentList = children .Distinct() .Select(grainId => GrainFactory.GetGrain(grainId)) + .GroupBy(g => g.GetType()) + .Select(g => g.First()) .ToList(); if (gAgentList.IsNullOrEmpty()) diff --git a/test/Aevatar.GAgents.Tests/GAgentEventPubTests.cs b/test/Aevatar.GAgents.Tests/GAgentEventPubTests.cs index 82e47ab2..6f70328d 100644 --- a/test/Aevatar.GAgents.Tests/GAgentEventPubTests.cs +++ b/test/Aevatar.GAgents.Tests/GAgentEventPubTests.cs @@ -1,4 +1,5 @@ using Aevatar.Core.Abstractions; +using Aevatar.Core.Abstractions.EventPublish; using Aevatar.Core.EventPublish; using Aevatar.Core.Tests.TestEvents; using Aevatar.Core.Tests.TestGAgents; @@ -11,11 +12,51 @@ public sealed class GAgentEventPubTests : AevatarGAgentsTestBase { private readonly ITestOutputHelper _outputHelper; private readonly IGAgentFactory _gAgentFactory; + private readonly IGrainFactory _grainFactory; public GAgentEventPubTests(ITestOutputHelper outputHelper) { _outputHelper = outputHelper; _gAgentFactory = GetRequiredService(); + _grainFactory = GetRequiredService(); + } + + [Fact] + public async Task RegisterTest() + { + var groupGAgent = await _gAgentFactory.GetGAgentAsync>(); + var gAgent = await _gAgentFactory.GetGAgentAsync(); + await groupGAgent.RegisterAsync(gAgent); + + var parent = await gAgent.GetParentAsync(); + parent.ShouldBe(groupGAgent.GetGrainId()); + + var children = await groupGAgent.GetChildrenAsync(); + children.Count.ShouldBe(1); + children.First().ShouldBe(gAgent.GetGrainId()); + + var childrenGroupGrain = + _grainFactory.GetGrain(0, groupGAgent.GetGrainId().ToString()); + (await childrenGroupGrain.GetChildrenCountAsync()).ShouldBe(1); + (await childrenGroupGrain.GetChildrenAsync()).First().ShouldBe(gAgent.GetGrainId()); + } + + [Fact] + public async Task UnregisterTest() + { + var groupGAgent = await _gAgentFactory.GetGAgentAsync>(); + var gAgent1 = await _gAgentFactory.GetGAgentAsync(); + var gAgent2 = await _gAgentFactory.GetGAgentAsync(); + await groupGAgent.RegisterAsync(gAgent1); + await groupGAgent.RegisterAsync(gAgent2); + await groupGAgent.UnregisterAsync(gAgent1); + + var parent = await gAgent1.GetParentAsync(); + parent.ShouldBe(default); + + var children = await groupGAgent.GetChildrenAsync(); + children.Count.ShouldBe(1); + children.First().ShouldBe(gAgent2.GetGrainId()); } [Fact] @@ -27,7 +68,7 @@ public async Task MultipleLevelTest() var investor1 = await _gAgentFactory.GetGAgentAsync(); var investor2 = await _gAgentFactory.GetGAgentAsync(); await marketingLeader.RegisterAsync(investor1); - //await marketingLeader.RegisterAsync(investor2); + await marketingLeader.RegisterAsync(investor2); var groupGAgent = await _gAgentFactory.GetGAgentAsync>(); await groupGAgent.RegisterAsync(marketingLeader); @@ -42,9 +83,14 @@ await publishingGAgent.PublishEventAsync(new NewDemandTestEvent await TestHelper.WaitUntilAsync(_ => CheckState(investor1), TimeSpan.FromSeconds(20)); - EventPubGrain.Called.Count.ShouldBePositive(); - var investorState = await investor1.GetStateAsync(); - investorState.Content.Count.ShouldBe(1); + { + var investorState = await investor1.GetStateAsync(); + investorState.Content.Count.ShouldBe(1); + } + { + var investorState = await investor2.GetStateAsync(); + investorState.Content.Count.ShouldBe(1); + } } private async Task CheckState(IStateGAgent investor1)