Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MongoDB.Driver;
using Orleans.Configuration;

namespace PluginGAgent.Silo.Extensions;

Expand All @@ -30,6 +31,11 @@ public static IHostBuilder UseOrleansConfiguration(this IHostBuilder hostBuilder
})
.AddMemoryStreams("Aevatar")
.UseAevatarPermissionManagement()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "default";
options.ServiceId = "testServiceId";
})
.UseAevatar()
;
})
Expand Down
1 change: 1 addition & 0 deletions src/Aevatar.Core.Abstractions/AevatarCoreConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ namespace Aevatar.Core.Abstractions;
public class AevatarCoreConstants
{
public const string StreamProvider = "Aevatar";
public const string DefaultStreamNamespace = "Aevatar";
public const char GAgentNamespaceSeparator = '.';
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this GAgentNamespaceSeparator serving any purpose?

}
14 changes: 14 additions & 0 deletions src/Aevatar.Core.Abstractions/AevatarCoreStreamConfig.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace Aevatar.Core.Abstractions;

public static class AevatarCoreStreamConfig
{
public static string Prefix { get; private set; } = AevatarCoreConstants.DefaultStreamNamespace;

public static void Initialize(string? prefix)
{
if (prefix != null)
{
Prefix = prefix;
}

Check warning on line 12 in src/Aevatar.Core.Abstractions/AevatarCoreStreamConfig.cs

View check run for this annotation

Codecov / codecov/patch

src/Aevatar.Core.Abstractions/AevatarCoreStreamConfig.cs#L10-L12

Added lines #L10 - L12 were not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace Aevatar.Core.Abstractions.EventPublish;

using Orleans.Concurrency;

public interface IEventPubChildrenGroupGrain : IGrainWithIntegerCompoundKey
{
Task DownwardsEventAsync(EventWrapperBase eventWrapper);
Task UpwardsEventAsync(EventWrapperBase eventWrapper);
Task AddChildAsync(GrainId childGrainId);
Task AddManyChildAsync(List<GrainId> childrenGrainIds);
Task RemoveChildAsync(GrainId childId);
[ReadOnly]
Task<List<GrainId>> GetChildrenAsync();
[ReadOnly]
Task<int> GetChildrenCountAsync();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using Orleans.Concurrency;

namespace Aevatar.Core.Abstractions.EventPublish;

public interface IStreamCoordinatorGrain : IGrainWithStringKey
{
Task<bool> SetParentAsync(GrainId parentGrainId);
Task ClearParentAsync(GrainId parentGrainId);
Task SetGroupIndexAsync(int groupIndex);
[ReadOnly]
Task<int> GetGroupIndexAsync();
[ReadOnly]
Task<GrainId> GetParentAsync();
Task<int> RegisterChildAsync(GrainId childGrainId);
Task<int> RegisterManyChildAsync(List<GrainId> childrenGrainIds);
Task UnregisterChildAsync(GrainId childGrainId);
[ReadOnly]
Task<List<GrainId>> GetChildrenAsync();
Task PublishEventAsync(EventWrapperBase eventWrapper);
Task DownwardsEventAsync(EventWrapperBase eventWrapper);
Task UpwardsEventAsync(EventWrapperBase eventWrapper);
}
13 changes: 13 additions & 0 deletions src/Aevatar.Core.Abstractions/IStateAgent.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Orleans.Concurrency;
using Orleans.Streams;

namespace Aevatar.Core.Abstractions;

Expand Down Expand Up @@ -80,6 +81,18 @@ public interface IGAgent : IGrainWithGuidKey
/// <param name="configuration"></param>
/// <returns></returns>
Task ConfigAsync(ConfigurationBase configuration);

/// <summary>
/// Get GAgentAsyncObserver
/// </summary>
/// <returns></returns>
Task<IAsyncObserver<EventWrapperBase>> GetGAgentAsyncObserverAsync();

/// <summary>
/// Resume subscription of parent's stream.
/// </summary>
/// <returns></returns>
Task ResumeSubscriptionAsync(IAsyncStream<EventWrapperBase> stream);
}

public interface IStateGAgent<TState> : IGAgent
Expand Down
1 change: 1 addition & 0 deletions src/Aevatar.Core.Abstractions/StateBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ public abstract class StateBase
[Id(0)] public List<GrainId> Children { get; set; } = [];
[Id(1)] public GrainId? Parent { get; set; }
[Id(2)] public string? GAgentCreator { get; set; }
[Id(3)] public GrainId StreamCoordinatorGrainId { get; set; }

public void Apply(StateLogEventBase @stateLogEvent)
{
Expand Down
3 changes: 2 additions & 1 deletion src/Aevatar.Core/AevatarGAgentConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ public static class AevatarGAgentConstants
public const string ConfigDefaultMethodName = "PerformConfigAsync";
public const string ForwardEventMethodName = "ForwardEventAsync";
public const int MaxSyncWorkConcurrency = 4;
}
public const int MaxChildrenPerGroup = 1500;
}
141 changes: 141 additions & 0 deletions src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
using Aevatar.Core.Abstractions;
using Aevatar.Core.Abstractions.EventPublish;
using Aevatar.Core.Extensions;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Orleans.EventSourcing;
using Orleans.Providers;
using Orleans.Streams;
using Orleans.Concurrency;

namespace Aevatar.Core.EventPublish;

[GenerateSerializer]
public class EventPubChildrenGroupState : StateBase
{
[Id(0)] public List<GrainId> Children { get; set; } = new();

Check warning on line 16 in src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs

View workflow job for this annotation

GitHub Actions / publish (Aevatar.Core)

'EventPubChildrenGroupState.Children' hides inherited member 'StateBase.Children'. Use the new keyword if hiding was intended.
[Id(1)] public int ChildrenCount { get; set; }
}

[GenerateSerializer]
public class EventPubChildrenGroupStateLogEvent : StateLogEventBase<EventPubChildrenGroupStateLogEvent>;

[StorageProvider(ProviderName = "PubSubStore")]
[LogConsistencyProvider(ProviderName = "LogStorage")]
[Reentrant]
public class EventPubChildrenGroupGrain :
JournaledGrain<EventPubChildrenGroupState, StateLogEventBase<EventPubChildrenGroupStateLogEvent>>,
IEventPubChildrenGroupGrain
{
private readonly ILogger<EventPubChildrenGroupGrain> _logger;
private readonly IStreamProvider _streamProvider;

public EventPubChildrenGroupGrain(ILogger<EventPubChildrenGroupGrain> logger)
{
_logger = logger;
_streamProvider = this.GetStreamProvider(AevatarCoreConstants.StreamProvider);
}

public async Task DownwardsEventAsync(EventWrapperBase eventWrapper)
{
_logger.LogInformation("[{grainId}] Starting DownwardsEventAsync for Event: {eventWrapper}",
this.GetGrainId().ToString(),
JsonConvert.SerializeObject(eventWrapper));

foreach (var childGrainId in State.Children)
{
var stream = _streamProvider.GetEventWrapperBaseStream(childGrainId.ToString());
await stream.OnNextAsync(eventWrapper);
}
}

public async Task UpwardsEventAsync(EventWrapperBase eventWrapper)
{
_logger.LogInformation("[{grainId}] Starting UpwardsEventAsync for Event: {eventWrapper}",
this.GetGrainId().ToString(),
JsonConvert.SerializeObject(eventWrapper));

Check warning on line 56 in src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs

View check run for this annotation

Codecov / codecov/patch

src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs#L53-L56

Added lines #L53 - L56 were not covered by tests

var stream = _streamProvider.GetEventWrapperBaseStream(this.GetPrimaryKeyString());
await stream.OnNextAsync(eventWrapper);
}

Check warning on line 60 in src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs

View check run for this annotation

Codecov / codecov/patch

src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs#L58-L60

Added lines #L58 - L60 were not covered by tests

public async Task AddChildAsync(GrainId childGrainId)
{
_logger.LogInformation("[{grainId}] Adding child GrainId: {GrainId}", this.GetGrainId().ToString(),
childGrainId);

RaiseEvent(new AddChildStateLogEvent { ChildId = childGrainId });
await ConfirmEvents();
}

public async Task AddManyChildAsync(List<GrainId> childrenGrainIds)
{
_logger.LogInformation("[{grainId}] Adding multiple children GrainIds: {childrenGrainIds}",
this.GetGrainId().ToString(), string.Join(", ", childrenGrainIds.ToString()));

Check warning on line 74 in src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs

View check run for this annotation

Codecov / codecov/patch

src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs#L72-L74

Added lines #L72 - L74 were not covered by tests

base.RaiseEvent(new AddChildManyStateLogEvent
{
ChildrenIds = childrenGrainIds
});
await ConfirmEvents();
}

Check warning on line 81 in src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs

View check run for this annotation

Codecov / codecov/patch

src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs#L76-L81

Added lines #L76 - L81 were not covered by tests

public async Task RemoveChildAsync(GrainId childId)
{
_logger.LogInformation("[{grainId}] Removing child GrainId: {GrainId}", this.GetGrainId().ToString(),
childId);

RaiseEvent(new RemoveChildStateLogEvent { ChildId = childId });
await ConfirmEvents();
}

public Task<List<GrainId>> GetChildrenAsync()
{
return Task.FromResult(State.Children);
}

public Task<int> GetChildrenCountAsync()
{
return Task.FromResult(State.ChildrenCount);
}

protected override void TransitionState(EventPubChildrenGroupState state,
StateLogEventBase<EventPubChildrenGroupStateLogEvent> @event)
{
switch (@event)
{
case AddChildStateLogEvent addChildEvent:
state.Children.Add(addChildEvent.ChildId);
state.ChildrenCount++;
break;
case AddChildManyStateLogEvent addChildManyEvent:
state.Children.AddRange(addChildManyEvent.ChildrenIds);
state.ChildrenCount += addChildManyEvent.ChildrenIds.Count;
break;

Check warning on line 114 in src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs

View check run for this annotation

Codecov / codecov/patch

src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs#L112-L114

Added lines #L112 - L114 were not covered by tests
case RemoveChildStateLogEvent removeChildEvent:
state.Children.Remove(removeChildEvent.ChildId);
state.ChildrenCount--;
break;
}

base.TransitionState(state, @event);
}

[GenerateSerializer]
public class AddChildStateLogEvent : StateLogEventBase<EventPubChildrenGroupStateLogEvent>
{
[Id(0)] public required GrainId ChildId { get; set; }
}

[GenerateSerializer]
public class AddChildManyStateLogEvent : StateLogEventBase<EventPubChildrenGroupStateLogEvent>
{
[Id(0)] public required List<GrainId> ChildrenIds { get; set; }

Check warning on line 133 in src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs

View check run for this annotation

Codecov / codecov/patch

src/Aevatar.Core/EventPublish/EventPubChildrenGroupGrain.cs#L133

Added line #L133 was not covered by tests
}

[GenerateSerializer]
public class RemoveChildStateLogEvent : StateLogEventBase<EventPubChildrenGroupStateLogEvent>
{
[Id(0)] public required GrainId ChildId { get; set; }
}
}
Loading
Loading