diff --git a/samples/PluginGAgent/PluginGAgent.Silo/Extensions/OrleansHostExtension.cs b/samples/PluginGAgent/PluginGAgent.Silo/Extensions/OrleansHostExtension.cs index 8c89370c..616a5a60 100644 --- a/samples/PluginGAgent/PluginGAgent.Silo/Extensions/OrleansHostExtension.cs +++ b/samples/PluginGAgent/PluginGAgent.Silo/Extensions/OrleansHostExtension.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using MongoDB.Driver; +using Orleans.Configuration; namespace PluginGAgent.Silo.Extensions; @@ -30,6 +31,11 @@ public static IHostBuilder UseOrleansConfiguration(this IHostBuilder hostBuilder }) .AddMemoryStreams("Aevatar") .UseAevatarPermissionManagement() + .Configure(options => + { + options.ClusterId = "default"; + options.ServiceId = "testServiceId"; + }) .UseAevatar() ; }) diff --git a/src/Aevatar.Core.Abstractions/AevatarCoreConstants.cs b/src/Aevatar.Core.Abstractions/AevatarCoreConstants.cs index f47f267a..14adaf38 100644 --- a/src/Aevatar.Core.Abstractions/AevatarCoreConstants.cs +++ b/src/Aevatar.Core.Abstractions/AevatarCoreConstants.cs @@ -3,5 +3,6 @@ namespace Aevatar.Core.Abstractions; public class AevatarCoreConstants { public const string StreamProvider = "Aevatar"; + public const string DefaultStreamNamespace = "Aevatar"; public const char GAgentNamespaceSeparator = '.'; } \ No newline at end of file diff --git a/src/Aevatar.Core.Abstractions/AevatarCoreStreamConfig.cs b/src/Aevatar.Core.Abstractions/AevatarCoreStreamConfig.cs new file mode 100644 index 00000000..2edacebe --- /dev/null +++ b/src/Aevatar.Core.Abstractions/AevatarCoreStreamConfig.cs @@ -0,0 +1,14 @@ +namespace Aevatar.Core.Abstractions; + +public static class AevatarCoreStreamConfig +{ + public static string Prefix { get; private set; } = AevatarCoreConstants.DefaultStreamNamespace; + + public static void Initialize(string? prefix) + { + if (prefix != null) + { + Prefix = prefix; + } + } +} \ No newline at end of file diff --git a/src/Aevatar.Core.Abstractions/EventPublish/IEventPubChildrenGroupGrain.cs b/src/Aevatar.Core.Abstractions/EventPublish/IEventPubChildrenGroupGrain.cs new file mode 100644 index 00000000..7f924191 --- /dev/null +++ b/src/Aevatar.Core.Abstractions/EventPublish/IEventPubChildrenGroupGrain.cs @@ -0,0 +1,12 @@ +namespace Aevatar.Core.Abstractions.EventPublish; + +public interface IEventPubChildrenGroupGrain : IGrainWithIntegerCompoundKey +{ + Task DownwardsEventAsync(EventWrapperBase eventWrapper); + Task UpwardsEventAsync(EventWrapperBase eventWrapper); + Task AddChildAsync(GrainId childGrainId); + Task AddManyChildAsync(List childrenGrainIds); + Task RemoveChildAsync(GrainId childId); + Task> GetChildrenAsync(); + Task GetChildrenCountAsync(); +} \ No newline at end of file diff --git a/src/Aevatar.Core.Abstractions/EventPublish/IEventPubGrain.cs b/src/Aevatar.Core.Abstractions/EventPublish/IEventPubGrain.cs new file mode 100644 index 00000000..95ebc35c --- /dev/null +++ b/src/Aevatar.Core.Abstractions/EventPublish/IEventPubGrain.cs @@ -0,0 +1,6 @@ +namespace Aevatar.Core.Abstractions.EventPublish; + +public interface IEventPubGrain : IGrainWithStringKey +{ + Task PublishEventAsync(EventWrapperBase eventWrapper); +} \ No newline at end of file diff --git a/src/Aevatar.Core.Abstractions/EventPublish/IStreamCoordinatorGrain.cs b/src/Aevatar.Core.Abstractions/EventPublish/IStreamCoordinatorGrain.cs new file mode 100644 index 00000000..8bfc0e1b --- /dev/null +++ b/src/Aevatar.Core.Abstractions/EventPublish/IStreamCoordinatorGrain.cs @@ -0,0 +1,22 @@ +using Orleans.Concurrency; + +namespace Aevatar.Core.Abstractions.EventPublish; + +public interface IStreamCoordinatorGrain : IGrainWithStringKey +{ + Task SetParentAsync(GrainId parentGrainId); + Task ClearParentAsync(GrainId parentGrainId); + Task SetGroupIndexAsync(int groupIndex); + [ReadOnly] + Task GetGroupIndexAsync(); + [ReadOnly] + Task GetParentAsync(); + Task RegisterChildAsync(GrainId childGrainId); + Task RegisterManyChildAsync(List childrenGrainIds); + Task UnregisterChildAsync(GrainId childGrainId); + [ReadOnly] + Task> GetChildrenAsync(); + Task PublishEventAsync(EventWrapperBase eventWrapper); + Task DownwardsEventAsync(EventWrapperBase eventWrapper); + Task UpwardsEventAsync(EventWrapperBase eventWrapper); +} \ No newline at end of file diff --git a/src/Aevatar.Core.Abstractions/IStateAgent.cs b/src/Aevatar.Core.Abstractions/IStateAgent.cs index 30d5d3da..fab85c03 100644 --- a/src/Aevatar.Core.Abstractions/IStateAgent.cs +++ b/src/Aevatar.Core.Abstractions/IStateAgent.cs @@ -1,4 +1,5 @@ using Orleans.Concurrency; +using Orleans.Streams; namespace Aevatar.Core.Abstractions; @@ -79,6 +80,18 @@ public interface IGAgent : IGrainWithGuidKey /// /// Task ConfigAsync(ConfigurationBase configuration); + + /// + /// Get GAgentAsyncObserver + /// + /// + Task> GetGAgentAsyncObserverAsync(); + + /// + /// Resume subscription of parent's stream. + /// + /// + Task ResumeSubscriptionAsync(IAsyncStream stream); } public interface IStateGAgent : IGAgent diff --git a/src/Aevatar.Core.Abstractions/StateBase.cs b/src/Aevatar.Core.Abstractions/StateBase.cs index f476b423..1f8aaeee 100644 --- a/src/Aevatar.Core.Abstractions/StateBase.cs +++ b/src/Aevatar.Core.Abstractions/StateBase.cs @@ -6,6 +6,7 @@ public abstract class StateBase [Id(0)] public List Children { get; set; } = []; [Id(1)] public GrainId? Parent { get; set; } [Id(2)] public string? GAgentCreator { get; set; } + [Id(3)] public GrainId StreamCoordinatorGrainId { get; set; } public void Apply(StateLogEventBase @stateLogEvent) { diff --git a/src/Aevatar.Core/AevatarGAgentConstants.cs b/src/Aevatar.Core/AevatarGAgentConstants.cs index 3c2e0a26..e009d5c2 100644 --- a/src/Aevatar.Core/AevatarGAgentConstants.cs +++ b/src/Aevatar.Core/AevatarGAgentConstants.cs @@ -7,4 +7,5 @@ public static class AevatarGAgentConstants public const string ConfigDefaultMethodName = "PerformConfigAsync"; public const string ForwardEventMethodName = "ForwardEventAsync"; public const int MaxSyncWorkConcurrency = 4; + public const int MaxChildrenPerGroup = 2000; } \ No newline at end of file diff --git a/src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs b/src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs new file mode 100644 index 00000000..a891442b --- /dev/null +++ b/src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs @@ -0,0 +1,116 @@ +using Aevatar.Core.Abstractions; +using Aevatar.Core.Abstractions.EventPublish; +using Aevatar.Core.Extensions; +using Orleans.EventSourcing; +using Orleans.Providers; +using Orleans.Streams; + +[GenerateSerializer] +public class EventPubChildrenGroupState : StateBase +{ + [Id(0)] public List Children { get; set; } = new(); + [Id(1)] public int ChildrenCount { get; set; } +} + +[GenerateSerializer] +public class EventPubChildrenGroupStateLogEvent : StateLogEventBase; + +[StorageProvider(ProviderName = "PubSubStore")] +[LogConsistencyProvider(ProviderName = "LogStorage")] +public class EventPubChildrenGroupGrain : + JournaledGrain>, + IEventPubChildrenGroupGrain +{ + private readonly IStreamProvider _streamProvider; + + public EventPubChildrenGroupGrain() + { + _streamProvider = this.GetStreamProvider(AevatarCoreConstants.StreamProvider); + } + + public async Task DownwardsEventAsync(EventWrapperBase eventWrapper) + { + foreach (var eventPubGrain in State.Children.Select(grainId => + GrainFactory.GetGrain(grainId.ToString()))) + { + await eventPubGrain.PublishEventAsync(eventWrapper); + } + } + + public async Task UpwardsEventAsync(EventWrapperBase eventWrapper) + { + var stream = _streamProvider.GetEventWrapperBaseStream(this.GetPrimaryKeyString()); + await stream.OnNextAsync(eventWrapper); + } + + public async Task AddChildAsync(GrainId childGrainId) + { + RaiseEvent(new AddChildStateLogEvent { ChildId = childGrainId }); + await ConfirmEvents(); + } + + public async Task AddManyChildAsync(List childrenGrainIds) + { + base.RaiseEvent(new AddChildManyStateLogEvent + { + ChildrenIds = childrenGrainIds + }); + await ConfirmEvents(); + } + + public async Task RemoveChildAsync(GrainId childId) + { + RaiseEvent(new RemoveChildStateLogEvent { ChildId = childId }); + await ConfirmEvents(); + } + + public Task> GetChildrenAsync() + { + return Task.FromResult(State.Children); + } + + public Task GetChildrenCountAsync() + { + return Task.FromResult(State.ChildrenCount); + } + + protected override void TransitionState(EventPubChildrenGroupState state, + StateLogEventBase @event) + { + switch (@event) + { + case AddChildStateLogEvent addChildEvent: + state.Children.Add(addChildEvent.ChildId); + state.ChildrenCount++; + break; + case AddChildManyStateLogEvent addChildManyEvent: + state.Children.AddRange(addChildManyEvent.ChildrenIds); + state.ChildrenCount += addChildManyEvent.ChildrenIds.Count; + break; + case RemoveChildStateLogEvent removeChildEvent: + state.Children.Remove(removeChildEvent.ChildId); + state.ChildrenCount--; + break; + } + + base.TransitionState(state, @event); + } + + [GenerateSerializer] + public class AddChildStateLogEvent : StateLogEventBase + { + [Id(0)] public required GrainId ChildId { get; set; } + } + + [GenerateSerializer] + public class AddChildManyStateLogEvent : StateLogEventBase + { + [Id(0)] public required List ChildrenIds { get; set; } + } + + [GenerateSerializer] + public class RemoveChildStateLogEvent : StateLogEventBase + { + [Id(0)] public required GrainId ChildId { get; set; } + } +} \ No newline at end of file diff --git a/src/Aevatar.Core/EventPublish/EventPubGrain.cs b/src/Aevatar.Core/EventPublish/EventPubGrain.cs new file mode 100644 index 00000000..3d9399e9 --- /dev/null +++ b/src/Aevatar.Core/EventPublish/EventPubGrain.cs @@ -0,0 +1,22 @@ +using Aevatar.Core.Abstractions; +using Aevatar.Core.Abstractions.EventPublish; +using Aevatar.Core.Extensions; +using Orleans.Streams; + +namespace Aevatar.Core.EventPublish; + +public class EventPubGrain : Grain, IEventPubGrain +{ + private readonly IStreamProvider _streamProvider; + + public EventPubGrain() + { + _streamProvider = this.GetStreamProvider(AevatarCoreConstants.StreamProvider); + } + + public async Task PublishEventAsync(EventWrapperBase eventWrapper) + { + var stream = _streamProvider.GetEventWrapperBaseStream(this.GetPrimaryKeyString()); + await stream.OnNextAsync(eventWrapper); + } +} \ No newline at end of file diff --git a/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs b/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs new file mode 100644 index 00000000..a45c0a7c --- /dev/null +++ b/src/Aevatar.Core/EventPublish/StreamCoordinatorGrain.cs @@ -0,0 +1,253 @@ +using Aevatar.Core; +using Aevatar.Core.Abstractions; +using Aevatar.Core.Abstractions.EventPublish; +using Aevatar.Core.Extensions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using Orleans.EventSourcing; +using Orleans.Providers; +using Orleans.Streams; + +[GenerateSerializer] +public class StreamCoordinatorState : StateBase +{ + /// + /// Group index -> Children count. + /// + [Id(1)] + public Dictionary GroupChildrenCount { get; set; } = new(); + + [Id(2)] public GrainId ParentGrainId { get; set; } + [Id(3)] public int GroupIndex { get; set; } +} + +[GenerateSerializer] +public class StreamCoordinatorStateLogEvent : StateLogEventBase; + +[StorageProvider(ProviderName = "PubSubStore")] +[LogConsistencyProvider(ProviderName = "LogStorage")] +public class StreamCoordinatorGrain : + JournaledGrain>, + IStreamCoordinatorGrain +{ + private readonly ILogger _logger; + private readonly IStreamProvider _streamProvider; + + public StreamCoordinatorGrain(ILogger logger) + { + _logger = logger; + _streamProvider = this.GetStreamProvider(AevatarCoreConstants.StreamProvider); + } + + public async Task SetParentAsync(GrainId parentGrainId) + { + if (State.ParentGrainId == parentGrainId) + { + return false; + } + RaiseEvent(new SetParentStateLogEvent{ParentGrainId = parentGrainId}); + await ConfirmEvents(); + return true; + } + + public async Task ClearParentAsync(GrainId parentGrainId) + { + RaiseEvent(new ClearParentStateLogEvent + { + Parent = parentGrainId + }); + await ConfirmEvents(); + } + + public async Task SetGroupIndexAsync(int groupIndex) + { + RaiseEvent(new SetGroupIndexStateLogEvent + { + GroupIndex = groupIndex + }); + await ConfirmEvents(); + } + + public Task GetGroupIndexAsync() + { + return Task.FromResult(State.GroupIndex); + } + + public Task GetParentAsync() + { + return Task.FromResult(State.ParentGrainId); + } + + public async Task RegisterChildAsync(GrainId childGrainId) + { + var groupIndex = State.GroupChildrenCount + .FirstOrDefault(kvp => kvp.Value <= AevatarGAgentConstants.MaxChildrenPerGroup).Key; + var childGroupGrain = GetChildGroupGrain(groupIndex); + var childrenCount = await childGroupGrain.GetChildrenCountAsync(); + await childGroupGrain.AddChildAsync(childGrainId); + RaiseEvent(new UpdateGroupChildrenCountStateLogEvent { GroupIndex = groupIndex, ChildrenCount = childrenCount + 1 }); + await ConfirmEvents(); + return groupIndex; + } + + public async Task RegisterManyChildAsync(List childrenGrainIds) + { + var count = childrenGrainIds.Count; + var groupIndex = State.GroupChildrenCount + .FirstOrDefault(kvp => kvp.Value <= AevatarGAgentConstants.MaxChildrenPerGroup - count).Key; + var childGroupGrain = GetChildGroupGrain(groupIndex); + var childrenCount = await childGroupGrain.GetChildrenCountAsync(); + await childGroupGrain.AddManyChildAsync(childrenGrainIds); + RaiseEvent(new UpdateGroupChildrenCountStateLogEvent { GroupIndex = groupIndex, ChildrenCount = childrenCount + count }); + await ConfirmEvents(); + return groupIndex; + } + + public async Task UnregisterChildAsync(GrainId childGrainId) + { + var childStreamCoordinator = GrainFactory.GetGrain(childGrainId.ToString()); + await childStreamCoordinator.ClearParentAsync(this.GetGrainId()); + var groupIndex = await childStreamCoordinator.GetGroupIndexAsync(); + var childGroupGrain = + GrainFactory.GetGrain(groupIndex, this.GetPrimaryKeyString()); + var children = await childGroupGrain.GetChildrenAsync(); + if (children.Contains(childGrainId)) + { + await childGroupGrain.RemoveChildAsync(childGrainId); + RaiseEvent(new UnregisterChildStateLogEvent { ChildGrainId = childGrainId, GroupIndex = groupIndex }); + await ConfirmEvents(); + } + else + { + _logger.LogError($"Not found child {childGrainId} in group {groupIndex}."); + } + } + + public async Task> GetChildrenAsync() + { + var allChildren = new List(); + foreach (var group in State.GroupChildrenCount) + { + var childGroupGrain = + GrainFactory.GetGrain(group.Key, this.GetPrimaryKeyString()); + var children = await childGroupGrain.GetChildrenAsync(); + allChildren.AddRange(children); + } + + return allChildren; + } + + public async Task PublishEventAsync(EventWrapperBase eventWrapper) + { + if (State.ParentGrainId == default) + { + _logger.LogInformation( + "Event is the first time appeared to silo: {@Event}", eventWrapper); + await DownwardsEventAsync(eventWrapper); + } + else + { + _logger.LogInformation( + "{GrainId} is publishing event upwards: {EventJson}", this.GetPrimaryKeyString(), + JsonConvert.SerializeObject(eventWrapper)); + await UpwardsEventAsync(eventWrapper); + } + } + + public async Task DownwardsEventAsync(EventWrapperBase eventWrapper) + { + foreach (var (groupIndex, _) in State.GroupChildrenCount) + { + var childrenGroupGrain = GetChildGroupGrain(groupIndex); + await childrenGroupGrain.DownwardsEventAsync(eventWrapper); + } + } + + public async Task UpwardsEventAsync(EventWrapperBase eventWrapper) + { + // Try self handling + if (eventWrapper.GetPublisherGrainId().ToString() != this.GetPrimaryKeyString()) + { + var selfEventPubGrain = GrainFactory.GetGrain($"{this.GetPrimaryKeyString()}"); + await selfEventPubGrain.PublishEventAsync(eventWrapper); + } + + // Parent handling + if (State.ParentGrainId != default) + { + // var parentEventPubGrain = GrainFactory.GetGrain(State.Parent.ToString()); + // await parentEventPubGrain.PublishEventAsync(eventWrapper); + // To avoid too many producers on parent's corresponding PubSubRendezvousGrain state. + var parentStream = _streamProvider.GetEventWrapperBaseStream(State.ParentGrainId); + await parentStream.OnNextAsync(eventWrapper); + } + } + + private IEventPubChildrenGroupGrain GetChildGroupGrain(int groupIndex) + { + return GrainFactory.GetGrain(groupIndex, this.GetPrimaryKeyString()); + } + + protected override void TransitionState(StreamCoordinatorState state, + StateLogEventBase @event) + { + switch (@event) + { + case SetParentStateLogEvent setParentEvent: + State.ParentGrainId = setParentEvent.ParentGrainId; + break; + case ClearParentStateLogEvent clearParentStateLogEvent: + if (clearParentStateLogEvent.Parent.ToString().Contains(State.ParentGrainId.ToString())) + State.ParentGrainId = default; + break; + case SetGroupIndexStateLogEvent setGroupIndexStateLogEvent: + State.GroupIndex = setGroupIndexStateLogEvent.GroupIndex; + break; + case UpdateGroupChildrenCountStateLogEvent updateEvent: + State.GroupChildrenCount[updateEvent.GroupIndex] = updateEvent.ChildrenCount; + if (updateEvent.ChildrenCount == AevatarGAgentConstants.MaxChildrenPerGroup) + State.GroupChildrenCount[updateEvent.GroupIndex + 1] = 0; + break; + case UnregisterChildStateLogEvent unregisterEvent: + if (state.GroupChildrenCount.ContainsKey(unregisterEvent.GroupIndex)) + state.GroupChildrenCount[unregisterEvent.GroupIndex] -= 1; + break; + } + + base.TransitionState(state, @event); + } + + [GenerateSerializer] + public class SetParentStateLogEvent : StateLogEventBase + { + [Id(0)] public GrainId ParentGrainId { get; set; } + } + + [GenerateSerializer] + public class UpdateGroupChildrenCountStateLogEvent : StateLogEventBase + { + [Id(0)] public int GroupIndex { get; set; } + [Id(1)] public int ChildrenCount { get; set; } + } + + [GenerateSerializer] + public class UnregisterChildStateLogEvent : StateLogEventBase + { + [Id(0)] public GrainId ChildGrainId { get; set; } + [Id(1)] public int GroupIndex { get; set; } + } + + [GenerateSerializer] + public class ClearParentStateLogEvent : StateLogEventBase + { + [Id(0)] public GrainId Parent { get; set; } + } + + [GenerateSerializer] + public class SetGroupIndexStateLogEvent : StateLogEventBase + { + [Id(0)] public int GroupIndex { get; set; } + } +} \ No newline at end of file diff --git a/src/Aevatar.Core/Extensions/EventWrapperBaseExtensions.cs b/src/Aevatar.Core/Extensions/EventWrapperBaseExtensions.cs new file mode 100644 index 00000000..6cb5de5d --- /dev/null +++ b/src/Aevatar.Core/Extensions/EventWrapperBaseExtensions.cs @@ -0,0 +1,36 @@ +using Aevatar.Core.Abstractions; + +namespace Aevatar.Core.Extensions; + +public static class EventWrapperBaseExtensions +{ + public static GrainId GetGrainId(this EventWrapperBase eventWrapper) + { + return (GrainId)eventWrapper.GetType().GetProperty(nameof(EventWrapper.GrainId)) + ?.GetValue(eventWrapper)!; + } + + public static GrainId GetPublisherGrainId(this EventWrapperBase eventWrapper) + { + return (GrainId)eventWrapper.GetType().GetProperty(nameof(EventWrapper.PublisherGrainId)) + ?.GetValue(eventWrapper)!; + } + + public static Guid? GetCorrelationId(this EventWrapperBase eventWrapper) + { + return (Guid?)eventWrapper.GetType().GetProperty(nameof(EventWrapper.CorrelationId)) + ?.GetValue(eventWrapper)!; + } + + public static EventBase GetEvent(this EventWrapperBase eventWrapper) + { + return (EventBase)eventWrapper.GetType().GetProperty(nameof(EventWrapper.Event)) + ?.GetValue(eventWrapper)!; + } + + public static Guid GetEventId(this EventWrapperBase eventWrapper) + { + return (Guid)eventWrapper.GetType().GetProperty(nameof(EventWrapper.EventId)) + ?.GetValue(eventWrapper)!; + } +} \ No newline at end of file diff --git a/src/Aevatar.Core/Extensions/StreamExtensions.cs b/src/Aevatar.Core/Extensions/StreamExtensions.cs new file mode 100644 index 00000000..648ad7e6 --- /dev/null +++ b/src/Aevatar.Core/Extensions/StreamExtensions.cs @@ -0,0 +1,16 @@ +using Aevatar.Core.Abstractions; +using Orleans.Streams; + +namespace Aevatar.Core.Extensions; + +public static class StreamExtensions +{ + public static IAsyncStream GetEventWrapperBaseStream(this IStreamProvider streamProvider, + string streamKey) + => streamProvider.GetStream(StreamId.Create(AevatarCoreStreamConfig.Prefix, streamKey)); + + public static IAsyncStream GetEventWrapperBaseStream(this IStreamProvider streamProvider, + GrainId grainId) + => streamProvider.GetStream(StreamId.Create(AevatarCoreStreamConfig.Prefix, + grainId.ToString())); +} \ No newline at end of file diff --git a/src/Aevatar.Core/GAgentBase.Observers.cs b/src/Aevatar.Core/GAgentBase.Observers.cs index 035edbe7..cb15d2f6 100644 --- a/src/Aevatar.Core/GAgentBase.Observers.cs +++ b/src/Aevatar.Core/GAgentBase.Observers.cs @@ -97,12 +97,9 @@ private EventWrapperBaseAsyncObserver CreateMethodObserver( { try { - var eventId = (Guid)item.GetType().GetProperty(nameof(EventWrapper.EventId)) - ?.GetValue(item)!; - var eventType = (TEvent)item.GetType().GetProperty(nameof(EventWrapper.Event)) - ?.GetValue(item)!; - var grainId = (GrainId)item.GetType().GetProperty(nameof(EventWrapper.GrainId)) - ?.GetValue(item)!; + var eventId = item.GetEventId(); + var eventType = (TEvent)item.GetEvent(); + var grainId = item.GetGrainId(); var eventWrapper = new EventWrapper(eventType, eventId, grainId); diff --git a/src/Aevatar.Core/GAgentBase.Publish.cs b/src/Aevatar.Core/GAgentBase.Publish.cs index 1754fb8f..f0f27f6c 100644 --- a/src/Aevatar.Core/GAgentBase.Publish.cs +++ b/src/Aevatar.Core/GAgentBase.Publish.cs @@ -14,8 +14,8 @@ protected async Task PublishAsync(EventWrapper eventWrapper) where T : Eve { try { - await SendEventUpwardsAsync(eventWrapper); - await SendEventDownwardsAsync(eventWrapper); + await _coordinator!.UpwardsEventAsync(eventWrapper); + await _coordinator!.DownwardsEventAsync(eventWrapper); } catch (Exception ex) { @@ -35,20 +35,8 @@ protected async Task PublishAsync(T @event) where T : EventBase var eventId = Guid.NewGuid(); try { - if (State.Parent == null) - { - Logger.LogInformation( - "Event is the first time appeared to silo: {@Event}", @event); - // This event is the first time appeared to silo. - await SendEventToSelfAsync(new EventWrapper(@event, eventId, GrainId)); - } - else - { - Logger.LogInformation( - "{GrainId} is publishing event upwards: {EventJson}", - GrainId.ToString(), JsonConvert.SerializeObject(@event)); - await PublishEventUpwardsAsync(@event, eventId); - } + var eventWrapper = new EventWrapper(@event, eventId, GrainId); + await _coordinator!.PublishEventAsync(eventWrapper); } catch (Exception ex) { @@ -59,81 +47,4 @@ protected async Task PublishAsync(T @event) where T : EventBase return eventId; } - - private async Task PublishEventUpwardsAsync(T @event, Guid eventId) where T : EventBase - { - try - { - await SendEventUpwardsAsync(new EventWrapper(@event, eventId, GrainId)); - Logger.LogDebug("{GrainId} published {Event} to upwards", GrainId.ToString(), - JsonConvert.SerializeObject(@event)); - } - catch (Exception ex) - { - Logger.LogError("{GrainId} failed to publish event {EventJson} upwards", GrainId.ToString(), - JsonConvert.SerializeObject(@event)); - throw new EventPublishingException($"{GrainId.ToString()} failed to publish event upwards", ex); - } - } - - private async Task SendEventUpwardsAsync(EventWrapper eventWrapper) where T : EventBase - { - if (State.Parent == null) - { - return; - } - - try - { - var stream = GetEventBaseStream(State.Parent.Value); - await stream.OnNextAsync(eventWrapper); - } - catch (Exception ex) - { - Logger.LogError("{GrainId} failed to send event {EventWrapper} upwards", GrainId.ToString(), - eventWrapper); - throw new EventPublishingException($"{GrainId.ToString()} failed to event event upwards", ex); - } - } - - private async Task SendEventToSelfAsync(EventWrapper eventWrapper) where T : EventBase - { - Logger.LogInformation( - $"{GrainId.ToString()} is sending event to self: {JsonConvert.SerializeObject(eventWrapper)}"); - try - { - var streamOfThisGAgent = GetEventBaseStream(GrainId); - await streamOfThisGAgent.OnNextAsync(eventWrapper); - } - catch (Exception ex) - { - Logger.LogError("{GrainId} failed to send event {EventWrapper} to itself", GrainId.ToString(), - eventWrapper); - throw new EventPublishingException($"{GrainId.ToString()} failed to event event to itself", ex); - } - } - - private async Task SendEventDownwardsAsync(EventWrapper eventWrapper) where T : EventBase - { - if (State.Children.IsNullOrEmpty()) - { - return; - } - - Logger.LogInformation($"{GrainId.ToString()} has {State.Children.Count} children."); - - try - { - foreach (var stream in State.Children.Select(GetEventBaseStream)) - { - await stream.OnNextAsync(eventWrapper); - } - } - catch (Exception ex) - { - Logger.LogError("{GrainId} failed to send event {EventWrapper} downwards", GrainId.ToString(), - eventWrapper); - throw new EventPublishingException($"{GrainId.ToString()} failed to event event downwards", ex); - } - } } \ No newline at end of file diff --git a/src/Aevatar.Core/GAgentBase.Subscribe.cs b/src/Aevatar.Core/GAgentBase.Subscribe.cs index e13af7a6..ba3c78eb 100644 --- a/src/Aevatar.Core/GAgentBase.Subscribe.cs +++ b/src/Aevatar.Core/GAgentBase.Subscribe.cs @@ -7,37 +7,13 @@ public abstract partial class GAgentBase @event) { - switch (@event) - { - case AddChildStateLogEvent addChildEvent: - Logger.LogDebug("GrainId {GrainId}: Adding child {Child}", this.GetGrainId().ToString(), addChildEvent.Child); - State.Children.Add(addChildEvent.Child); - break; - case AddChildManyStateLogEvent addChildManyEvent: - Logger.LogDebug("GrainId {GrainId}: Adding children {Child}", this.GetGrainId().ToString(), addChildManyEvent.Children); - State.Children.AddRange(addChildManyEvent.Children); - break; - case RemoveChildStateLogEvent removeChildEvent: - Logger.LogDebug("GrainId {GrainId}: Removing child {Child}", this.GetGrainId().ToString(), removeChildEvent.Child); - State.Children.Remove(removeChildEvent.Child); - break; - case SetParentStateLogEvent setParentEvent: - Logger.LogDebug("GrainId {GrainId}: Setting parent to {Parent}", this.GetGrainId().ToString(), setParentEvent.Parent); - State.Parent = setParentEvent.Parent; - break; - case ClearParentStateLogEvent clearParentEvent: - Logger.LogDebug("GrainId {GrainId}: Clearing parent {Parent}", this.GetGrainId().ToString(), clearParentEvent.Parent); - if (State.Parent == clearParentEvent.Parent) - State.Parent = default; - break; - } - GAgentTransitionState(state, @event); - - Logger.LogInformation("GrainId {GrainId}: State before transition: {@State}", this.GetGrainId().ToString(), State); - + + Logger.LogInformation("GrainId {GrainId}: State before transition: {@State}", this.GetGrainId().ToString(), + State); + base.TransitionState(state, @event); - + // print out the state after transition Logger.LogDebug("GrainId {GrainId}: State after transition: {@State}", this.GetGrainId().ToString(), State); } @@ -54,7 +30,7 @@ private async Task AddChildAsync(GrainId grainId) Logger.LogError($"Cannot add duplicate child {grainId}."); return; } - + Logger.LogDebug("GrainId [{GrainId}] Adding child to {Parent}", this.GetGrainId().ToString(), grainId); base.RaiseEvent(new AddChildStateLogEvent @@ -64,75 +40,16 @@ private async Task AddChildAsync(GrainId grainId) await ConfirmEvents(); } - private async Task AddChildManyAsync(List grainIds) - { - base.RaiseEvent(new AddChildManyStateLogEvent - { - Children = grainIds - }); - await ConfirmEvents(); - } - [GenerateSerializer] public class AddChildStateLogEvent : StateLogEventBase { [Id(0)] public GrainId Child { get; set; } } - [GenerateSerializer] - public class AddChildManyStateLogEvent : StateLogEventBase - { - [Id(0)] public required List Children { get; set; } - } - - private async Task RemoveChildAsync(GrainId grainId) - { - Logger.LogDebug("GrainId [{GrainId}] Removing child to {Parent}", this.GetGrainId().ToString(), grainId); - if (!State.Children.IsNullOrEmpty()) - { - base.RaiseEvent(new RemoveChildStateLogEvent - { - Child = grainId - }); - await ConfirmEvents(); - } - } [GenerateSerializer] public class RemoveChildStateLogEvent : StateLogEventBase { [Id(0)] public GrainId Child { get; set; } } - - [GenerateSerializer] - public class SetParentStateLogEvent : StateLogEventBase - { - [Id(0)] public GrainId Parent { get; set; } - } - - private async Task SetParentAsync(GrainId grainId) - { - Logger.LogDebug("GrainId [{GrainId}] Setting parent to {Parent}", this.GetGrainId().ToString(), grainId); - base.RaiseEvent(new SetParentStateLogEvent - { - Parent = grainId - }); - await ConfirmEvents(); - } - - [GenerateSerializer] - public class ClearParentStateLogEvent : StateLogEventBase - { - [Id(0)] public GrainId Parent { get; set; } - } - - private async Task ClearParentAsync(GrainId grainId) - { - Logger.LogDebug("GrainId [{GrainId}] Removing parent to {Parent}", this.GetGrainId().ToString(), grainId); - base.RaiseEvent(new ClearParentStateLogEvent - { - Parent = grainId - }); - await ConfirmEvents(); - } } \ No newline at end of file diff --git a/src/Aevatar.Core/GAgentBase.SyncWorker.cs b/src/Aevatar.Core/GAgentBase.SyncWorker.cs index d222719b..1238ac1b 100644 --- a/src/Aevatar.Core/GAgentBase.SyncWorker.cs +++ b/src/Aevatar.Core/GAgentBase.SyncWorker.cs @@ -1,5 +1,6 @@ using Aevatar.Core.Abstractions; using Aevatar.Core.Abstractions.SyncWorker; +using Aevatar.Core.Extensions; using Microsoft.Extensions.Logging; using Orleans.SyncWork; @@ -14,7 +15,7 @@ protected async Task CreateLongRunTaskAsync(TRequest reques try { var syncWorker = GrainFactory.GetGrain>(Guid.NewGuid()); - await syncWorker.SetLongRunTaskAsync(GetEventBaseStream(GrainId)); + await syncWorker.SetLongRunTaskAsync(StreamProvider.GetEventWrapperBaseStream(GrainId)); await syncWorker.StartWorkAndPollUntilResult(request); } catch (Exception ex) diff --git a/src/Aevatar.Core/GAgentBase.cs b/src/Aevatar.Core/GAgentBase.cs index 5429fe65..bd0d5d7a 100644 --- a/src/Aevatar.Core/GAgentBase.cs +++ b/src/Aevatar.Core/GAgentBase.cs @@ -1,6 +1,7 @@ -using System.Collections.Concurrent; using Aevatar.Core.Abstractions; +using Aevatar.Core.Abstractions.EventPublish; using Aevatar.Core.Abstractions.Projections; +using Aevatar.Core.Extensions; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; @@ -45,7 +46,11 @@ public abstract partial class private Lazy LazyStreamProvider => new(() => this.GetStreamProvider(AevatarCoreConstants.StreamProvider)); + private Lazy LazyGAgentFactory => new(() + => ServiceProvider.GetRequiredService()); + protected IStreamProvider StreamProvider => LazyStreamProvider.Value; + protected IGAgentFactory GAgentFactory => LazyGAgentFactory.Value; public ILogger Logger { get; set; } = NullLogger.Instance; @@ -54,6 +59,8 @@ public abstract partial class private IStateDispatcher? StateDispatcher { get; set; } protected AevatarOptions? AevatarOptions { get; private set; } + private IStreamCoordinatorGrain? _coordinator; + public async Task ActivateAsync() { await Task.Yield(); @@ -61,15 +68,22 @@ public async Task ActivateAsync() public async Task RegisterAsync(IGAgent gAgent) { - if (gAgent.GetGrainId() == this.GetGrainId()) + var grainId = gAgent.GetGrainId(); + if (grainId == this.GetGrainId()) { Logger.LogError($"Cannot register GAgent with same GrainId."); return; } - await AddChildAsync(gAgent.GetGrainId()); - await gAgent.SubscribeToAsync(this); - await OnRegisterAgentAsync(gAgent.GetGrainId()); + Logger.LogDebug("GrainId [{GrainId}] register child {Parent}", GrainId.ToString(), grainId.ToString()); + + var childStreamCoordinator = GrainFactory.GetGrain(grainId.ToString()); + if (await childStreamCoordinator.SetParentAsync(GrainId)) + { + var groupIndex = await _coordinator!.RegisterChildAsync(grainId); + await childStreamCoordinator.SetGroupIndexAsync(groupIndex); + await OnRegisterAgentAsync(grainId); + } } public async Task RegisterManyAsync(List gAgents) @@ -86,35 +100,51 @@ public async Task RegisterManyAsync(List gAgents) } var grainIds = gAgents.Select(g => g.GetGrainId()).ToList(); - var tasks = new List(); + var successGrainIds = new List(); + var streamCoordinators = new List(); foreach (var gAgent in gAgents) { - tasks.Add(gAgent.SubscribeToAsync(this)); + var childStreamCoordinator = GrainFactory.GetGrain(gAgent.GetGrainId().ToString()); + streamCoordinators.Add(childStreamCoordinator); + if (await childStreamCoordinator.SetParentAsync(GrainId)) + { + successGrainIds.Add(gAgent.GetGrainId()); + } + } + + // TODO: Optimize. + var groupIndex = await _coordinator!.RegisterManyChildAsync(successGrainIds); + foreach (var coordinator in streamCoordinators) + { + await coordinator.SetGroupIndexAsync(groupIndex); } - tasks.Add(AddChildManyAsync(grainIds)); - tasks.Add(OnRegisterAgentManyAsync(grainIds)); - await Task.WhenAll(tasks); + await OnRegisterAgentManyAsync(grainIds); } public async Task SubscribeToAsync(IGAgent gAgent) { - await SetParentAsync(gAgent.GetGrainId()); + var grainId = gAgent.GetGrainId(); + Logger.LogDebug("GrainId [{GrainId}] subscribe to {Parent}", GrainId.ToString(), grainId.ToString()); + await _coordinator!.SetParentAsync(grainId); } - public Task UnsubscribeFromAsync(IGAgent gAgent) + public async Task UnsubscribeFromAsync(IGAgent gAgent) { - return ClearParentAsync(gAgent.GetGrainId()); + var grainId = gAgent.GetGrainId(); + Logger.LogDebug("GrainId [{GrainId}] unsubscribe from {Parent}", GrainId.ToString(), grainId.ToString()); + await _coordinator!.ClearParentAsync(grainId); } public async Task UnregisterAsync(IGAgent gAgent) { - await RemoveChildAsync(gAgent.GetGrainId()); - await gAgent.UnsubscribeFromAsync(this); + var grainId = gAgent.GetGrainId(); + Logger.LogDebug("GrainId [{GrainId}] unregister child {Child}", GrainId.ToString(), grainId.ToString()); + await _coordinator!.UnregisterChildAsync(gAgent.GetGrainId()); await OnUnregisterAgentAsync(gAgent.GetGrainId()); } - public virtual Task?> GetAllSubscribedEventsAsync(bool includeBaseHandlers = false) + public async virtual Task?> GetAllSubscribedEventsAsync(bool includeBaseHandlers = false) { var eventHandlerMethods = GetEventHandlerMethods(GetType()); eventHandlerMethods = eventHandlerMethods.Where(m => @@ -126,17 +156,17 @@ public async Task UnregisterAsync(IGAgent gAgent) handlingTypes = handlingTypes.Where(t => t != typeof(RequestAllSubscriptionsEvent)); } - return Task.FromResult(handlingTypes.ToList())!; + return handlingTypes.ToList(); } - public Task> GetChildrenAsync() + public async Task> GetChildrenAsync() { - return Task.FromResult(State.Children); + return await _coordinator!.GetChildrenAsync(); } - public Task GetParentAsync() + public async Task GetParentAsync() { - return Task.FromResult(State.Parent ?? default); + return await _coordinator!.GetParentAsync(); } public virtual Task GetConfigurationTypeAsync() @@ -152,6 +182,18 @@ public async Task ConfigAsync(ConfigurationBase configuration) } } + public async Task> GetGAgentAsyncObserverAsync() + { + var asyncObserver = new GAgentAsyncObserver(_observers); + return asyncObserver; + } + + public async Task ResumeSubscriptionAsync(IAsyncStream stream) + { + var asyncObserver = new GAgentAsyncObserver(_observers); + await ResumeOrSubscribeAsync(stream, asyncObserver); + } + protected virtual Task PerformConfigAsync(TConfiguration configuration) { return Task.CompletedTask; @@ -167,9 +209,12 @@ public async Task HandleRequestAllSubscriptionsEventAs private async Task GetGroupSubscribedEventListEvent() { - var gAgentList = State.Children + var children = await _coordinator!.GetChildrenAsync(); + var gAgentList = children .Distinct() .Select(grainId => GrainFactory.GetGrain(grainId)) + .GroupBy(g => g.GetType()) + .Select(g => g.First()) .ToList(); if (gAgentList.IsNullOrEmpty()) @@ -219,7 +264,7 @@ protected virtual async Task ForwardEventAsync(EventWrapperBase eventWrapper) })) { Logger.LogDebug("Forwarding event to children: {Event}", JsonConvert.SerializeObject(typedWrapper)); - await SendEventDownwardsAsync(typedWrapper); + await _coordinator!.DownwardsEventAsync(eventWrapper); } } @@ -265,17 +310,21 @@ private async Task BaseOnActivateAsync(CancellationToken cancellationToken) // This must be called first to initialize Observers field. await UpdateObserverListAsync(GetType()); - var initTasks = new[] - { - InitializeOrResumeEventBaseStreamAsync(), - ActivateProjectionGrainAsync() - }; - await Task.WhenAll(initTasks); + _coordinator = GrainFactory.GetGrain( + this.GetGrainId().ToString()); + + await InitializeOrResumeEventBaseStreamAsync(); + await ActivateProjectionGrainAsync(); } private async Task InitializeOrResumeEventBaseStreamAsync() { - var streamOfThisGAgent = GetEventBaseStream(this.GetGrainId()); + if (_observers.Count == 0) + { + return; + } + + var streamOfThisGAgent = StreamProvider.GetEventWrapperBaseStream(GrainId); var asyncObserver = new GAgentAsyncObserver(_observers); await ResumeOrSubscribeAsync(streamOfThisGAgent, asyncObserver); } @@ -359,11 +408,4 @@ protected virtual Task HandleRaiseEventAsync() // Derived classes can override this method. return Task.CompletedTask; } - - private IAsyncStream GetEventBaseStream(GrainId grainId) - { - var grainIdString = grainId.ToString(); - var streamId = StreamId.Create(AevatarOptions!.StreamNamespace, grainIdString); - return StreamProvider.GetStream(streamId); - } } \ No newline at end of file diff --git a/src/Aevatar/Extensions/OrleansHostExtensions.cs b/src/Aevatar/Extensions/OrleansHostExtensions.cs index 7984a8c0..c8dd871a 100644 --- a/src/Aevatar/Extensions/OrleansHostExtensions.cs +++ b/src/Aevatar/Extensions/OrleansHostExtensions.cs @@ -1,3 +1,4 @@ +using Aevatar.Core.Abstractions; using Aevatar.Core; using Aevatar.Plugins.Extensions; using Microsoft.Extensions.DependencyInjection; @@ -26,6 +27,9 @@ public static ISiloBuilder UseAevatar(this ISiloBuilder builder, bool includingA services.Add(abpApplication.Services); } + var prefix = services.GetConfiguration().GetSection("Aevatar").GetSection("StreamNamespace").Value; + AevatarCoreStreamConfig.Initialize(prefix); + services.AddSingleton(_ => new LimitedConcurrencyLevelTaskScheduler(AevatarGAgentConstants.MaxSyncWorkConcurrency)); }); diff --git a/test/Aevatar.Core.Tests/EventHandlingTests.cs b/test/Aevatar.Core.Tests/EventHandlingTests.cs index 14a10e31..7dac028e 100644 --- a/test/Aevatar.Core.Tests/EventHandlingTests.cs +++ b/test/Aevatar.Core.Tests/EventHandlingTests.cs @@ -23,7 +23,7 @@ public async Task EventHandlerRecognizeTest() subscribedEventList.ShouldNotBeNull(); subscribedEventList.Count.ShouldBe(4); subscribedEventList.ShouldContain(typeof(NaiveTestEvent)); - subscribedEventList.Count(e => e == typeof(NaiveTestEvent)).ShouldBe(2); + subscribedEventList.Count(e => e == typeof(NaiveTestEvent)).ShouldBe(3); subscribedEventList.ShouldContain(typeof(EventWrapperBase)); } @@ -33,9 +33,9 @@ public async Task EventHandlerRecognizeTest() // Assert. subscribedEventList.ShouldNotBeNull(); - subscribedEventList.Count.ShouldBe(4); + subscribedEventList.Count.ShouldBe(5); subscribedEventList.ShouldContain(typeof(NaiveTestEvent)); - subscribedEventList.Count(e => e == typeof(NaiveTestEvent)).ShouldBe(2); + subscribedEventList.Count(e => e == typeof(NaiveTestEvent)).ShouldBe(3); subscribedEventList.ShouldContain(typeof(EventWrapperBase)); subscribedEventList.ShouldContain(typeof(RequestAllSubscriptionsEvent)); } @@ -49,7 +49,7 @@ public async Task EventHandlerTest() var groupGAgent = await CreateGroupGAgentAsync(eventHandlerTestGAgent); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(eventHandlerTestGAgent, groupGAgent, publishingGAgent); + await AddProbesByGrainIdAsync(eventHandlerTestGAgent, groupGAgent, publishingGAgent); // Act of registering. await publishingGAgent.PublishEventAsync(new NaiveTestEvent @@ -87,7 +87,7 @@ public async Task EventWithResponseTest() var groupGAgent = await CreateGroupGAgentAsync(eventHandlerTestGAgent, eventHandlerWithResponseTestGAgent); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(eventHandlerTestGAgent, eventHandlerWithResponseTestGAgent, groupGAgent, publishingGAgent); + AddProbesByGrainIdAsync(eventHandlerTestGAgent, eventHandlerWithResponseTestGAgent, groupGAgent, publishingGAgent); // Act. await publishingGAgent.PublishEventAsync(new ResponseTestEvent @@ -169,7 +169,7 @@ public async Task RequestSubscribedEventListTest() subscribeTestGAgent); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(eventHandlerTestGAgent, eventHandlerWithResponseTestGAgent, subscribeTestGAgent, groupGAgent, + await AddProbesByGrainIdAsync(eventHandlerTestGAgent, eventHandlerWithResponseTestGAgent, subscribeTestGAgent, groupGAgent, publishingGAgent); // Act. @@ -177,8 +177,8 @@ public async Task RequestSubscribedEventListTest() // Assert. var state = await subscribeTestGAgent.GetStateAsync(); - state.SubscriptionInfo.Count.ShouldBe(4); - state.SubscriptionInfo[typeof(EventHandlerTestGAgent)].Count.ShouldBe(3); + state.SubscriptionInfo.Count.ShouldBe(3); + state.SubscriptionInfo[typeof(EventHandlerTestGAgent)].Count.ShouldBe(4); state.SubscriptionInfo[typeof(EventHandlerWithResponseTestGAgent)].Count.ShouldBe(1); state.SubscriptionInfo[typeof(SubscribeTestGAgent)].Count.ShouldBe(1); } @@ -192,7 +192,7 @@ public async Task ExceptionHandlingTest() var groupGAgent = await CreateGroupGAgentAsync(eventHandlerTestGAgent, testGAgent); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(eventHandlerTestGAgent, testGAgent, groupGAgent, publishingGAgent); + await AddProbesByGrainIdAsync(eventHandlerTestGAgent, testGAgent, groupGAgent, publishingGAgent); // Act. await publishingGAgent.PublishEventAsync(new NaiveTestEvent diff --git a/test/Aevatar.Core.Tests/EventSourcingTests.cs b/test/Aevatar.Core.Tests/EventSourcingTests.cs index 481d4518..aa3d7d1d 100644 --- a/test/Aevatar.Core.Tests/EventSourcingTests.cs +++ b/test/Aevatar.Core.Tests/EventSourcingTests.cs @@ -19,7 +19,7 @@ public async Task LogViewAdaptorTest() var groupGAgent = await CreateGroupGAgentAsync(logViewGAgent); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(publishingGAgent, groupGAgent, logViewGAgent); + AddProbesByGrainIdAsync(publishingGAgent, groupGAgent, logViewGAgent); // Act: First event. await publishingGAgent.PublishEventAsync(new NaiveTestEvent @@ -56,15 +56,12 @@ await publishingGAgent.PublishEventAsync(new NaiveTestEvent (await logViewGAgent.GetStateAsync()).Content.Count.ShouldBe(3); } - const int minimum = 1; // SetParent or AddChildren event. // Asset: Check the log storage. InMemoryLogConsistentStorage.Storage.Count.ShouldBeGreaterThanOrEqualTo(3); InMemoryLogConsistentStorage.Storage.ShouldContainKey(GetStreamName(logViewGAgent.GetGrainId())); - InMemoryLogConsistentStorage.Storage[GetStreamName(logViewGAgent.GetGrainId())].Count.ShouldBe(minimum + 3); + InMemoryLogConsistentStorage.Storage[GetStreamName(logViewGAgent.GetGrainId())].Count.ShouldBe(3); InMemoryLogConsistentStorage.Storage.ShouldContainKey(GetStreamName(groupGAgent.GetGrainId())); - InMemoryLogConsistentStorage.Storage[GetStreamName(groupGAgent.GetGrainId())].Count.ShouldBe(minimum + 2); - InMemoryLogConsistentStorage.Storage.ShouldContainKey(GetStreamName(publishingGAgent.GetGrainId())); - InMemoryLogConsistentStorage.Storage[GetStreamName(publishingGAgent.GetGrainId())].Count.ShouldBe(minimum); + InMemoryLogConsistentStorage.Storage[GetStreamName(groupGAgent.GetGrainId())].Count.ShouldBe(1); } private async Task CheckCount(LogViewAdaptorTestGAgent gAgent, int expectedCount) diff --git a/test/Aevatar.Core.Tests/GAgentTestKitBase.cs b/test/Aevatar.Core.Tests/GAgentTestKitBase.cs index 01be8421..fe2e49b4 100644 --- a/test/Aevatar.Core.Tests/GAgentTestKitBase.cs +++ b/test/Aevatar.Core.Tests/GAgentTestKitBase.cs @@ -30,7 +30,7 @@ protected async Task CreateGroupGAgentAsync(params IGAgent[] gAgent return groupGAgent; } - protected void AddProbesByGrainId(params IGAgent?[] gAgents) + protected async Task AddProbesByGrainIdAsync(params IGAgent?[] gAgents) { foreach (var gAgent in gAgents) { diff --git a/test/Aevatar.Core.Tests/GroupingTests.cs b/test/Aevatar.Core.Tests/GroupingTests.cs index bc3415a9..187ecb84 100644 --- a/test/Aevatar.Core.Tests/GroupingTests.cs +++ b/test/Aevatar.Core.Tests/GroupingTests.cs @@ -72,9 +72,9 @@ public async Task MultipleGroupRegisterOneGAgentTest() // Assert: Check each group's states from GrainStorage. foreach (var groupGAgent in new List { groupGAgent1, groupGAgent2, groupGAgent3 }) { - var subscribers = await groupGAgent.GetChildrenAsync(); - subscribers.Count.ShouldBe(1); - subscribers.First().ShouldBe(naiveTestGAgent.GetGrainId()); + var children = await groupGAgent.GetChildrenAsync(); + children.Count.ShouldBe(1); + children.First().ShouldBe(naiveTestGAgent.GetGrainId()); } } @@ -163,7 +163,8 @@ public async Task SubscriptionTest() var groupGAgent = await CreateGroupGAgentAsync(marketingLeader, developingLeader); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(publishingGAgent, groupGAgent, marketingLeader, developingLeader, developer1, developer2, developer3, investor1, investor2); + AddProbesByGrainIdAsync(publishingGAgent, groupGAgent, marketingLeader, developingLeader, developer1, + developer2, developer3, investor1, investor2); // Act. await publishingGAgent.PublishEventAsync(new NewDemandTestEvent @@ -202,7 +203,7 @@ public async Task RegisterSameGrainTest() var gAgent = await Silo.CreateGrainAsync(guid); await gAgent.RegisterAsync(gAgent); var groupGAgent = await CreateGroupGAgentAsync(gAgent, gAgent); - + var subscribers = await groupGAgent.GetChildrenAsync(); subscribers.Count.ShouldBe(1); } diff --git a/test/Aevatar.Core.Tests/PublishingTests.cs b/test/Aevatar.Core.Tests/PublishingTests.cs index 9abf0578..77812a35 100644 --- a/test/Aevatar.Core.Tests/PublishingTests.cs +++ b/test/Aevatar.Core.Tests/PublishingTests.cs @@ -15,7 +15,7 @@ public async Task PublishToEventHandlerTest() var groupGAgent = await CreateGroupGAgentAsync(eventHandlerTestGAgent); var publishingGAgent = await CreatePublishingGAgentAsync(groupGAgent); - AddProbesByGrainId(publishingGAgent, groupGAgent, eventHandlerTestGAgent); + AddProbesByGrainIdAsync(publishingGAgent, groupGAgent, eventHandlerTestGAgent); // Act. await publishingGAgent.PublishEventAsync(new NaiveTestEvent @@ -46,7 +46,7 @@ public async Task MultiLevelDownwardsTest() var level1 = await CreateGroupGAgentAsync(level2A, level2B); var publishingGAgent = await CreatePublishingGAgentAsync(level1); - AddProbesByGrainId(publishingGAgent, level1, level2A, level2B, level3A, level3B); + await AddProbesByGrainIdAsync(publishingGAgent, level1, level2A, level2B, level3A, level3B); // Act. await publishingGAgent.PublishEventAsync(new NaiveTestEvent @@ -56,13 +56,13 @@ await publishingGAgent.PublishEventAsync(new NaiveTestEvent // Assert. var state3A = await level3A.GetStateAsync(); - state3A.Content.Count.ShouldBe(3); + state3A.Content.Count.ShouldBe(6); var state3B = await level3B.GetStateAsync(); - state3B.Content.Count.ShouldBe(3); + state3B.Content.Count.ShouldBe(6); var state2A = await level2A.GetStateAsync(); - state2A.Content.Count.ShouldBe(3); + state2A.Content.Count.ShouldBe(6); var state2B = await level2B.GetStateAsync(); - state2B.Content.Count.ShouldBe(3); + state2B.Content.Count.ShouldBe(4); } [Fact(DisplayName = "Event can be published upwards.")] @@ -78,7 +78,7 @@ public async Task MultiLevelUpwardsTest() var level1 = await CreateGroupGAgentAsync(level2A, level2B); var publishingGAgent = await CreatePublishingGAgentAsync(level1); - AddProbesByGrainId(publishingGAgent, level1, level2A, level2B, level3A, level3B); + AddProbesByGrainIdAsync(publishingGAgent, level1, level2A, level2B, level3A, level3B); // Act: ResponseTestEvent will cause level32 publish an NaiveTestEvent. await publishingGAgent.PublishEventAsync(new ResponseTestEvent @@ -88,13 +88,13 @@ await publishingGAgent.PublishEventAsync(new ResponseTestEvent // Assert: level31 and level21 should receive the response event, then has 1 + 3 content stored. var state3A = await level3A.GetStateAsync(); - state3A.Content.Count.ShouldBe(4); + state3A.Content.Count.ShouldBe(5); var state2A = await level2A.GetStateAsync(); - state2A.Content.Count.ShouldBe(4); + state2A.Content.Count.ShouldBe(5); // Assert: level22 should not receive the response event, then has 1 content stored (due to [AllEventHandler]). var state2B = await level2B.GetStateAsync(); - state2B.Content.Count.ShouldBe(1); + state2B.Content.Count.ShouldBe(2); } [Fact(DisplayName = "Everything works even if the same Guid is used for different grains.")] @@ -113,7 +113,7 @@ public async Task RegisterSameGuidTest() await level1.RegisterAsync(level2B); var publishingGAgent = await CreatePublishingGAgentAsync(level1); - AddProbesByGrainId(publishingGAgent, level1, level2A, level2B, level3A, level3B); + AddProbesByGrainIdAsync(publishingGAgent, level1, level2A, level2B, level3A, level3B); // Act: ResponseTestEvent will cause level32 publish an NaiveTestEvent. await publishingGAgent.PublishEventAsync(new ResponseTestEvent @@ -123,12 +123,12 @@ await publishingGAgent.PublishEventAsync(new ResponseTestEvent // Assert: level31 and level21 should receive the response event, then has 1 + 3 content stored. var state3A = await level3A.GetStateAsync(); - state3A.Content.Count.ShouldBe(4); + state3A.Content.Count.ShouldBe(5); var state2A = await level2A.GetStateAsync(); - state2A.Content.Count.ShouldBe(4); + state2A.Content.Count.ShouldBe(5); // Assert: level22 should not receive the response event, then has 1 content stored (due to [AllEventHandler]). var state2B = await level2B.GetStateAsync(); - state2B.Content.Count.ShouldBe(1); + state2B.Content.Count.ShouldBe(2); } } \ No newline at end of file diff --git a/test/Aevatar.Core.Tests/TestGAgents/ChildTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/ChildTestGAgent.cs new file mode 100644 index 00000000..b9d59a71 --- /dev/null +++ b/test/Aevatar.Core.Tests/TestGAgents/ChildTestGAgent.cs @@ -0,0 +1,37 @@ +using Aevatar.Core.Abstractions; +using Aevatar.Core.Tests.TestEvents; + +namespace Aevatar.Core.Tests.TestGAgents; + +[GenerateSerializer] + +public class ChildTestGAgentState : StateBase +{ + [Id(0)] public List Content { get; set; } = []; +} + +[GenerateSerializer] +public class ChildTestStateLogEvent : StateLogEventBase; + +public interface IChildTestGAgent : IStateGAgent; + +[GAgent] +public class ChildTestGAgent : GAgentBase, IChildTestGAgent +{ + public override Task GetDescriptionAsync() + { + return Task.FromResult("This is a GAgent for testing child."); + } + + [EventHandler] + public Task ExecuteAsync(NaiveTestEvent eventData) + { + AddContent(eventData.Greeting); + return Task.CompletedTask; + } + + private void AddContent(string content) + { + State.Content.Add(content); + } +} \ No newline at end of file diff --git a/test/Aevatar.Core.Tests/TestGAgents/DeveloperTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/DeveloperTestGAgent.cs index 2da6ac2e..25c16db9 100644 --- a/test/Aevatar.Core.Tests/TestGAgents/DeveloperTestGAgent.cs +++ b/test/Aevatar.Core.Tests/TestGAgents/DeveloperTestGAgent.cs @@ -19,14 +19,7 @@ public override Task GetDescriptionAsync() public async Task HandleEventAsync(DevelopTaskTestEvent eventData) { - if (State.Content.IsNullOrEmpty()) - { - State.Content = []; - } - State.Content.Add(eventData.Description); - - Logger.LogInformation("TEST"); return new NewFeatureCompletedTestEvent { PullRequestUrl = $"PR for {eventData.Description}" diff --git a/test/Aevatar.Core.Tests/TestGAgents/DevelopingLeaderTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/DevelopingLeaderTestGAgent.cs index e3b5d0ea..c44b2c1d 100644 --- a/test/Aevatar.Core.Tests/TestGAgents/DevelopingLeaderTestGAgent.cs +++ b/test/Aevatar.Core.Tests/TestGAgents/DevelopingLeaderTestGAgent.cs @@ -27,11 +27,6 @@ await PublishAsync(new DevelopTaskTestEvent public async Task HandleEventAsync(NewFeatureCompletedTestEvent eventData) { - if (State.Content.IsNullOrEmpty()) - { - State.Content = []; - } - State.Content.Add(eventData.PullRequestUrl); if (State.Content.Count == 3) diff --git a/test/Aevatar.Core.Tests/TestGAgents/EventHandlerTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/EventHandlerTestGAgent.cs index 2c2d9d21..dcb25fee 100644 --- a/test/Aevatar.Core.Tests/TestGAgents/EventHandlerTestGAgent.cs +++ b/test/Aevatar.Core.Tests/TestGAgents/EventHandlerTestGAgent.cs @@ -7,13 +7,15 @@ namespace Aevatar.Core.Tests.TestGAgents; [GenerateSerializer] public class EventHandlerTestGAgentState : StateBase { - [Id(0)] public List Content { get; set; } + [Id(0)] public List Content { get; set; } = []; } public class EventHandlerTestStateLogEvent : StateLogEventBase; +public interface IEventHandlerTestGAgent : IStateGAgent; + [GAgent("eventHandlerTest", "test")] -public class EventHandlerTestGAgent : GAgentBase +public class EventHandlerTestGAgent : GAgentBase, IEventHandlerTestGAgent { public override Task GetDescriptionAsync() { @@ -55,11 +57,6 @@ public Task HandleEventWithExceptionAsync(NaiveTestEvent eventData) private void AddContent(string content) { - if (State.Content.IsNullOrEmpty()) - { - State.Content = []; - } - State.Content.Add(content); } } \ No newline at end of file diff --git a/test/Aevatar.Core.Tests/TestGAgents/InvestorTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/InvestorTestGAgent.cs index b94ee457..653b6e64 100644 --- a/test/Aevatar.Core.Tests/TestGAgents/InvestorTestGAgent.cs +++ b/test/Aevatar.Core.Tests/TestGAgents/InvestorTestGAgent.cs @@ -4,7 +4,7 @@ namespace Aevatar.Core.Tests.TestGAgents; -public interface IInvestorTestGAgent: IGAgent +public interface IInvestorTestGAgent: IStateGAgent { } @@ -25,11 +25,6 @@ public override Task GetDescriptionAsync() public async Task HandleEventAsync(WorkingOnTestEvent eventData) { - if (State.Content.IsNullOrEmpty()) - { - State.Content = []; - } - State.Content.Add(eventData.Description); await PublishAsync(new InvestorFeedbackTestEvent diff --git a/test/Aevatar.Core.Tests/TestGAgents/MarketingLeaderTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/MarketingLeaderTestGAgent.cs index 28542e21..09513a9c 100644 --- a/test/Aevatar.Core.Tests/TestGAgents/MarketingLeaderTestGAgent.cs +++ b/test/Aevatar.Core.Tests/TestGAgents/MarketingLeaderTestGAgent.cs @@ -1,6 +1,5 @@ using Aevatar.Core.Abstractions; using Aevatar.Core.Tests.TestEvents; -using Microsoft.Extensions.Logging; namespace Aevatar.Core.Tests.TestGAgents; @@ -9,7 +8,8 @@ public interface IMarketingLeaderTestGAgent : IGAgent; public class MarketingLeaderTestGAgentState : NaiveTestGAgentState; [GAgent("marketingLeader", "test")] -public class MarketingLeaderTestGAgent : GAgentBase, IMarketingLeaderTestGAgent +public class MarketingLeaderTestGAgent : GAgentBase, + IMarketingLeaderTestGAgent { public override Task GetDescriptionAsync() { @@ -31,14 +31,9 @@ await PublishAsync(new WorkingOnTestEvent Description = $"Working completed: {eventData.PullRequestUrl}" }); } - + public async Task HandleEventAsync(InvestorFeedbackTestEvent eventData) { - if (State.Content.IsNullOrEmpty()) - { - State.Content = []; - } - State.Content.Add($"Feedback from investor: {eventData.Content}"); } } \ No newline at end of file diff --git a/test/Aevatar.Core.Tests/TestGAgents/NaiveTestGAgent.cs b/test/Aevatar.Core.Tests/TestGAgents/NaiveTestGAgent.cs index 9ded83e5..8b04389d 100644 --- a/test/Aevatar.Core.Tests/TestGAgents/NaiveTestGAgent.cs +++ b/test/Aevatar.Core.Tests/TestGAgents/NaiveTestGAgent.cs @@ -7,7 +7,7 @@ namespace Aevatar.Core.Tests.TestGAgents; [GenerateSerializer] public class NaiveTestGAgentState : StateBase { - [Id(0)] public List Content { get; set; } + [Id(0)] public List Content { get; set; } = []; } public class NaiveTestStateLogEvent : StateLogEventBase diff --git a/test/Aevatar.GAgents.Tests/GAgentBaseTests.cs b/test/Aevatar.GAgents.Tests/GAgentBaseTests.cs index 1d03dd63..4655a09f 100644 --- a/test/Aevatar.GAgents.Tests/GAgentBaseTests.cs +++ b/test/Aevatar.GAgents.Tests/GAgentBaseTests.cs @@ -47,12 +47,17 @@ public async Task ComplicatedEventHandleTest() var developer1 = _grainFactory.GetGrain(guid); var developer2 = _grainFactory.GetGrain(Guid.NewGuid()); var developer3 = _grainFactory.GetGrain(Guid.NewGuid()); + await developer1.ActivateAsync(); + await developer2.ActivateAsync(); + await developer3.ActivateAsync(); await developingLeader.RegisterAsync(developer1); await developingLeader.RegisterAsync(developer2); await developingLeader.RegisterAsync(developer3); var investor1 = _grainFactory.GetGrain>(guid); var investor2 = _grainFactory.GetGrain>(Guid.NewGuid()); + await investor1.ActivateAsync(); + await investor2.ActivateAsync(); await marketingLeader.RegisterAsync(investor1); await marketingLeader.RegisterAsync(investor2); @@ -69,7 +74,6 @@ await publishingGAgent.PublishEventAsync(new NewDemandTestEvent }); await TestHelper.WaitUntilAsync(_ => CheckState(investor1), TimeSpan.FromSeconds(20)); - var groupState = await groupGAgent.GetStateAsync(); groupState.RegisteredGAgents.ShouldBe(2); @@ -77,12 +81,14 @@ await publishingGAgent.PublishEventAsync(new NewDemandTestEvent investorState.Content.Count.ShouldBe(2); } + [Fact] public async Task SyncWorkerTest() { var guid = Guid.NewGuid(); // Arrange. var testGAgent = _grainFactory.GetGrain>(guid); + await testGAgent.ActivateAsync(); var publishingGAgent = _grainFactory.GetGrain(guid); await publishingGAgent.RegisterAsync(testGAgent); diff --git a/test/Aevatar.GAgents.Tests/GAgentEventPubTests.cs b/test/Aevatar.GAgents.Tests/GAgentEventPubTests.cs new file mode 100644 index 00000000..6f70328d --- /dev/null +++ b/test/Aevatar.GAgents.Tests/GAgentEventPubTests.cs @@ -0,0 +1,101 @@ +using Aevatar.Core.Abstractions; +using Aevatar.Core.Abstractions.EventPublish; +using Aevatar.Core.EventPublish; +using Aevatar.Core.Tests.TestEvents; +using Aevatar.Core.Tests.TestGAgents; +using Shouldly; +using Xunit.Abstractions; + +namespace Aevatar.GAgents.Tests; + +public sealed class GAgentEventPubTests : AevatarGAgentsTestBase +{ + private readonly ITestOutputHelper _outputHelper; + private readonly IGAgentFactory _gAgentFactory; + private readonly IGrainFactory _grainFactory; + + public GAgentEventPubTests(ITestOutputHelper outputHelper) + { + _outputHelper = outputHelper; + _gAgentFactory = GetRequiredService(); + _grainFactory = GetRequiredService(); + } + + [Fact] + public async Task RegisterTest() + { + var groupGAgent = await _gAgentFactory.GetGAgentAsync>(); + var gAgent = await _gAgentFactory.GetGAgentAsync(); + await groupGAgent.RegisterAsync(gAgent); + + var parent = await gAgent.GetParentAsync(); + parent.ShouldBe(groupGAgent.GetGrainId()); + + var children = await groupGAgent.GetChildrenAsync(); + children.Count.ShouldBe(1); + children.First().ShouldBe(gAgent.GetGrainId()); + + var childrenGroupGrain = + _grainFactory.GetGrain(0, groupGAgent.GetGrainId().ToString()); + (await childrenGroupGrain.GetChildrenCountAsync()).ShouldBe(1); + (await childrenGroupGrain.GetChildrenAsync()).First().ShouldBe(gAgent.GetGrainId()); + } + + [Fact] + public async Task UnregisterTest() + { + var groupGAgent = await _gAgentFactory.GetGAgentAsync>(); + var gAgent1 = await _gAgentFactory.GetGAgentAsync(); + var gAgent2 = await _gAgentFactory.GetGAgentAsync(); + await groupGAgent.RegisterAsync(gAgent1); + await groupGAgent.RegisterAsync(gAgent2); + await groupGAgent.UnregisterAsync(gAgent1); + + var parent = await gAgent1.GetParentAsync(); + parent.ShouldBe(default); + + var children = await groupGAgent.GetChildrenAsync(); + children.Count.ShouldBe(1); + children.First().ShouldBe(gAgent2.GetGrainId()); + } + + [Fact] + public async Task MultipleLevelTest() + { + // Arrange. + var marketingLeader = await _gAgentFactory.GetGAgentAsync(); + + var investor1 = await _gAgentFactory.GetGAgentAsync(); + var investor2 = await _gAgentFactory.GetGAgentAsync(); + await marketingLeader.RegisterAsync(investor1); + await marketingLeader.RegisterAsync(investor2); + + var groupGAgent = await _gAgentFactory.GetGAgentAsync>(); + await groupGAgent.RegisterAsync(marketingLeader); + var publishingGAgent = await _gAgentFactory.GetGAgentAsync(); + await publishingGAgent.RegisterAsync(groupGAgent); + + // Act. + await publishingGAgent.PublishEventAsync(new NewDemandTestEvent + { + Description = "New demand from customer." + }); + + await TestHelper.WaitUntilAsync(_ => CheckState(investor1), TimeSpan.FromSeconds(20)); + + { + var investorState = await investor1.GetStateAsync(); + investorState.Content.Count.ShouldBe(1); + } + { + var investorState = await investor2.GetStateAsync(); + investorState.Content.Count.ShouldBe(1); + } + } + + private async Task CheckState(IStateGAgent investor1) + { + var state = await investor1.GetStateAsync(); + return !state.Content.IsNullOrEmpty() && state.Content.Count == 1; + } +} \ No newline at end of file diff --git a/test/Aevatar.GAgents.Tests/SubscriptionTests.cs b/test/Aevatar.GAgents.Tests/SubscriptionTests.cs new file mode 100644 index 00000000..a3b5d258 --- /dev/null +++ b/test/Aevatar.GAgents.Tests/SubscriptionTests.cs @@ -0,0 +1,60 @@ +using System.Diagnostics; +using Aevatar.Core.Abstractions; +using Aevatar.Core.Tests.TestEvents; +using Aevatar.Core.Tests.TestGAgents; +using Shouldly; +using Xunit.Abstractions; + +namespace Aevatar.GAgents.Tests; + +public sealed class SubscriptionTests : AevatarGAgentsTestBase +{ + private readonly ITestOutputHelper _outputHelper; + private readonly IGAgentFactory _gAgentFactory; + + public SubscriptionTests(ITestOutputHelper outputHelper) + { + _outputHelper = outputHelper; + _gAgentFactory = GetRequiredService(); + } + + [Theory] + [InlineData(10)] + [InlineData(100)] + public async Task ManyChildrenTest(int count) + { + var stopWatch = new Stopwatch(); + stopWatch.Start(); + var children = new List(); + for (var i = 0; i < count; i++) + { + var child = await _gAgentFactory.GetGAgentAsync(); + children.Add(child); + } + + var groupGAgent = await _gAgentFactory.GetGAgentAsync>(Guid.NewGuid()); + foreach (var child in children) + { + await groupGAgent.RegisterAsync(child); + } + + var publishingGAgent = await _gAgentFactory.GetGAgentAsync(); + await publishingGAgent.RegisterAsync(groupGAgent); + await publishingGAgent.PublishEventAsync(new NaiveTestEvent + { + Greeting = "test" + }); + + await Task.Delay(10000); + + foreach (var child in children.AsParallel()) + { + var state = await child.GetStateAsync(); + state.Content.Count.ShouldBe(1); + state.Content[0].ShouldBe("test"); + } + + stopWatch.Stop(); + _outputHelper.WriteLine($"Time: {stopWatch.ElapsedMilliseconds}ms"); + } +} \ No newline at end of file diff --git a/test/Aevatar.TestBase/ClusterFixture.cs b/test/Aevatar.TestBase/ClusterFixture.cs index 3b6b36dd..733e7f8a 100644 --- a/test/Aevatar.TestBase/ClusterFixture.cs +++ b/test/Aevatar.TestBase/ClusterFixture.cs @@ -101,6 +101,7 @@ public void Configure(ISiloBuilder hostBuilder) services.AddSingleton(grainTypeMap); services.AddSingleton(); services.AddSingleton(); + services.AddSingleton(); }) .UseAevatar() .UseAevatarPermissionManagement() diff --git a/test/OrleansTestKit/TestGrainFactory.cs b/test/OrleansTestKit/TestGrainFactory.cs index d83c0b5f..389a5199 100644 --- a/test/OrleansTestKit/TestGrainFactory.cs +++ b/test/OrleansTestKit/TestGrainFactory.cs @@ -1,4 +1,5 @@ -using Moq; +using Aevatar.Core.Abstractions.EventPublish; +using Moq; using Orleans.Runtime; namespace Orleans.TestKit; @@ -102,10 +103,13 @@ internal void AddProbe(Func> factory) AddProbe(adaptedFactory); } - private static string GetKey(IdSpan identity, Type stateType, string? classPrefix = null) => - classPrefix == null - ? $"{stateType.FullName}-{identity}" - : $"{stateType.FullName}-{classPrefix}-{identity}"; + private static string GetKey(IdSpan identity, Type stateType, string? classPrefix = null) + { + var prefix = stateType.Name!.ToLower()[1..^5]; + return classPrefix == null + ? $"{prefix}/{identity}" + : $"{prefix}/{classPrefix}/{identity}"; + } private T GetProbe(IdSpan identity, string? grainClassNamePrefix) where T : IGrain @@ -117,6 +121,7 @@ private T GetProbe(IdSpan identity, string? grainClassNamePrefix) private IGrain GetProbe(Type grainType, IdSpan identity, string? grainClassNamePrefix) { var key = GetKey(identity, grainType, grainClassNamePrefix); + if (_probes.TryGetValue(key, out var grain)) { return grain; diff --git a/test/OrleansTestKit/TestKitSilo.cs b/test/OrleansTestKit/TestKitSilo.cs index 4bbabbde..25f59c9a 100644 --- a/test/OrleansTestKit/TestKitSilo.cs +++ b/test/OrleansTestKit/TestKitSilo.cs @@ -2,6 +2,8 @@ using Aevatar; using Aevatar.Core; using Aevatar.Core.Abstractions; +using Aevatar.Core.Abstractions.EventPublish; +using Aevatar.Core.EventPublish; using Aevatar.EventSourcing.Core; using Aevatar.EventSourcing.Core.Hosting; using Aevatar.EventSourcing.Core.LogConsistency; @@ -300,6 +302,16 @@ public async Task CreateGrainAsync(IdSpan identity, CancellationToken canc await GetReminderActivationContext(grain, cancellation).ConfigureAwait(false); } + if (typeof(IGAgent).IsAssignableFrom(typeof(T))) + { + var streamCoordinatorGrain = await CreateGrainAsync(grainId.ToString()); + GrainFactory.AddProbe(streamCoordinatorGrain.GetGrainId(), streamCoordinatorGrain); + var childrenGroupGrain = await CreateGrainAsync(0, grainId.ToString()); + GrainFactory.AddProbe(childrenGroupGrain.GetGrainId(), childrenGroupGrain); + var eventPubGrain = await CreateGrainAsync(grainId.ToString()); + GrainFactory.AddProbe(eventPubGrain.GetGrainId(), eventPubGrain); + } + await grain.OnActivateAsync(cancellation).ConfigureAwait(false); _activatedGrains.Add(grain);