Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
using Orleans.Configuration;

namespace PluginGAgent.Silo.Extensions;

Expand All @@ -30,6 +31,11 @@ public static IHostBuilder UseOrleansConfiguration(this IHostBuilder hostBuilder
})
.AddMemoryStreams("Aevatar")
.UseAevatarPermissionManagement()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "default";
options.ServiceId = "testServiceId";
})
.UseAevatar()
;
})
Expand Down
1 change: 1 addition & 0 deletions src/Aevatar.Core.Abstractions/AevatarCoreConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ namespace Aevatar.Core.Abstractions;
public class AevatarCoreConstants
{
public const string StreamProvider = "Aevatar";
public const string DefaultStreamNamespace = "Aevatar";
public const char GAgentNamespaceSeparator = '.';
}
14 changes: 14 additions & 0 deletions src/Aevatar.Core.Abstractions/AevatarCoreStreamConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace Aevatar.Core.Abstractions;

public static class AevatarCoreStreamConfig
{
public static string Prefix { get; private set; } = AevatarCoreConstants.DefaultStreamNamespace;

public static void Initialize(string? prefix)
{
if (prefix != null)
{
Prefix = prefix;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace Aevatar.Core.Abstractions.EventPublish;

public interface IEventPubChildrenGroupGrain : IGrainWithIntegerCompoundKey
{
Task DownwardsEventAsync(EventWrapperBase eventWrapper);
Task UpwardsEventAsync(EventWrapperBase eventWrapper);
Task AddChildAsync(GrainId childGrainId);
Task AddManyChildAsync(List<GrainId> childrenGrainIds);
Task RemoveChildAsync(GrainId childId);
Task<List<GrainId>> GetChildrenAsync();
Task<int> GetChildrenCountAsync();
}
6 changes: 6 additions & 0 deletions src/Aevatar.Core.Abstractions/EventPublish/IEventPubGrain.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Aevatar.Core.Abstractions.EventPublish;

public interface IEventPubGrain : IGrainWithStringKey
{
Task PublishEventAsync(EventWrapperBase eventWrapper);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Orleans.Concurrency;

namespace Aevatar.Core.Abstractions.EventPublish;

public interface IStreamCoordinatorGrain : IGrainWithStringKey
{
Task<bool> SetParentAsync(GrainId parentGrainId);
Task ClearParentAsync(GrainId parentGrainId);
Task SetGroupIndexAsync(int groupIndex);
[ReadOnly]
Task<int> GetGroupIndexAsync();
[ReadOnly]
Task<GrainId> GetParentAsync();
Task<int> RegisterChildAsync(GrainId childGrainId);
Task<int> RegisterManyChildAsync(List<GrainId> childrenGrainIds);
Task UnregisterChildAsync(GrainId childGrainId);
[ReadOnly]
Task<List<GrainId>> GetChildrenAsync();
Task PublishEventAsync(EventWrapperBase eventWrapper);
Task DownwardsEventAsync(EventWrapperBase eventWrapper);
Task UpwardsEventAsync(EventWrapperBase eventWrapper);
}
13 changes: 13 additions & 0 deletions src/Aevatar.Core.Abstractions/IStateAgent.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Orleans.Concurrency;
using Orleans.Streams;

namespace Aevatar.Core.Abstractions;

Expand Down Expand Up @@ -79,6 +80,18 @@ public interface IGAgent : IGrainWithGuidKey
/// <param name="configuration"></param>
/// <returns></returns>
Task ConfigAsync(ConfigurationBase configuration);

/// <summary>
/// Get GAgentAsyncObserver
/// </summary>
/// <returns></returns>
Task<IAsyncObserver<EventWrapperBase>> GetGAgentAsyncObserverAsync();

/// <summary>
/// Resume subscription of parent's stream.
/// </summary>
/// <returns></returns>
Task ResumeSubscriptionAsync(IAsyncStream<EventWrapperBase> stream);
}

public interface IStateGAgent<TState> : IGAgent
Expand Down
1 change: 1 addition & 0 deletions src/Aevatar.Core.Abstractions/StateBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public abstract class StateBase
[Id(0)] public List<GrainId> Children { get; set; } = [];
[Id(1)] public GrainId? Parent { get; set; }
[Id(2)] public string? GAgentCreator { get; set; }
[Id(3)] public GrainId StreamCoordinatorGrainId { get; set; }

public void Apply(StateLogEventBase @stateLogEvent)
{
Expand Down
1 change: 1 addition & 0 deletions src/Aevatar.Core/AevatarGAgentConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ public static class AevatarGAgentConstants
public const string ConfigDefaultMethodName = "PerformConfigAsync";
public const string ForwardEventMethodName = "ForwardEventAsync";
public const int MaxSyncWorkConcurrency = 4;
public const int MaxChildrenPerGroup = 2000;
}
116 changes: 116 additions & 0 deletions src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
using Aevatar.Core.Abstractions;
using Aevatar.Core.Abstractions.EventPublish;
using Aevatar.Core.Extensions;
using Orleans.EventSourcing;
using Orleans.Providers;
using Orleans.Streams;

[GenerateSerializer]
public class EventPubChildrenGroupState : StateBase
{
[Id(0)] public List<GrainId> Children { get; set; } = new();
[Id(1)] public int ChildrenCount { get; set; }
}

[GenerateSerializer]
public class EventPubChildrenGroupStateLogEvent : StateLogEventBase<EventPubChildrenGroupStateLogEvent>;

[StorageProvider(ProviderName = "PubSubStore")]
[LogConsistencyProvider(ProviderName = "LogStorage")]
public class EventPubChildrenGroupGrain :
JournaledGrain<EventPubChildrenGroupState, StateLogEventBase<EventPubChildrenGroupStateLogEvent>>,
IEventPubChildrenGroupGrain
{
private readonly IStreamProvider _streamProvider;

public EventPubChildrenGroupGrain()
{
_streamProvider = this.GetStreamProvider(AevatarCoreConstants.StreamProvider);
}

public async Task DownwardsEventAsync(EventWrapperBase eventWrapper)
{
foreach (var eventPubGrain in State.Children.Select(grainId =>
GrainFactory.GetGrain<IEventPubGrain>(grainId.ToString())))
{
await eventPubGrain.PublishEventAsync(eventWrapper);
}
}

public async Task UpwardsEventAsync(EventWrapperBase eventWrapper)
{
var stream = _streamProvider.GetEventWrapperBaseStream(this.GetPrimaryKeyString());
await stream.OnNextAsync(eventWrapper);
}

public async Task AddChildAsync(GrainId childGrainId)
{
RaiseEvent(new AddChildStateLogEvent { ChildId = childGrainId });
await ConfirmEvents();
}

public async Task AddManyChildAsync(List<GrainId> childrenGrainIds)
{
base.RaiseEvent(new AddChildManyStateLogEvent
{
ChildrenIds = childrenGrainIds
});
await ConfirmEvents();
}

public async Task RemoveChildAsync(GrainId childId)
{
RaiseEvent(new RemoveChildStateLogEvent { ChildId = childId });
await ConfirmEvents();
}

public Task<List<GrainId>> GetChildrenAsync()
{
return Task.FromResult(State.Children);
}

public Task<int> GetChildrenCountAsync()
{
return Task.FromResult(State.ChildrenCount);
}

protected override void TransitionState(EventPubChildrenGroupState state,
StateLogEventBase<EventPubChildrenGroupStateLogEvent> @event)
{
switch (@event)
{
case AddChildStateLogEvent addChildEvent:
state.Children.Add(addChildEvent.ChildId);
state.ChildrenCount++;
break;
case AddChildManyStateLogEvent addChildManyEvent:
state.Children.AddRange(addChildManyEvent.ChildrenIds);
state.ChildrenCount += addChildManyEvent.ChildrenIds.Count;
break;
case RemoveChildStateLogEvent removeChildEvent:
state.Children.Remove(removeChildEvent.ChildId);
state.ChildrenCount--;
break;
}

base.TransitionState(state, @event);
}

[GenerateSerializer]
public class AddChildStateLogEvent : StateLogEventBase<EventPubChildrenGroupStateLogEvent>
{
[Id(0)] public required GrainId ChildId { get; set; }
}

[GenerateSerializer]
public class AddChildManyStateLogEvent : StateLogEventBase<EventPubChildrenGroupStateLogEvent>
{
[Id(0)] public required List<GrainId> ChildrenIds { get; set; }
}

[GenerateSerializer]
public class RemoveChildStateLogEvent : StateLogEventBase<EventPubChildrenGroupStateLogEvent>
{
[Id(0)] public required GrainId ChildId { get; set; }
}
}
22 changes: 22 additions & 0 deletions src/Aevatar.Core/EventPublish/EventPubGrain.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Aevatar.Core.Abstractions;
using Aevatar.Core.Abstractions.EventPublish;
using Aevatar.Core.Extensions;
using Orleans.Streams;

namespace Aevatar.Core.EventPublish;

public class EventPubGrain : Grain, IEventPubGrain
{
private readonly IStreamProvider _streamProvider;

public EventPubGrain()
{
_streamProvider = this.GetStreamProvider(AevatarCoreConstants.StreamProvider);
}

public async Task PublishEventAsync(EventWrapperBase eventWrapper)
{
var stream = _streamProvider.GetEventWrapperBaseStream(this.GetPrimaryKeyString());
await stream.OnNextAsync(eventWrapper);
}
}
Loading