Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,21 @@ public static IServiceCollection AddMassTransitStreamPlugin(

if (options.Consumer.Enabled)
{
// Broadcast Mode for LocalHandler (HttpApi):
// Each instance uses a unique ConsumerGroupId so all instances receive all messages.
// Messages are filtered locally by checking if there's a subscriber for the StreamId.
// This is how Orleans Stream works for Clients - simple and reliable.
var baseGroupId = options.Kafka?.ConsumerGroupId ?? "aevatar-agents-group";
var isBroadcastMode = options.Consumer.DispatchHandler == DispatchHandler.LocalHandler;
var consumerGroupId = isBroadcastMode
? $"{baseGroupId}-{Environment.MachineName}-{System.Diagnostics.Process.GetCurrentProcess().Id}"
: baseGroupId;

foreach (var topic in consumerTopics)
{
k.TopicEndpoint<ByteArrayMessage>(
topic,
options.Kafka?.ConsumerGroupId ?? "aevatar-agents-group",
consumerGroupId,
e =>
{
e.AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Earliest;
Expand Down
23 changes: 23 additions & 0 deletions plugins/Aevatar.Agents.Plugins.MassTransit/ITraceIdExtractor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using Aevatar.Agents.Abstractions;

namespace Aevatar.Agents.Plugins.MassTransit;

/// <summary>
/// Pluggable trace ID extractor for MassTransit message dispatching.
///
/// Implementations can extract application-specific correlation/trace IDs
/// from event envelopes for logging and distributed tracing.
///
/// Register via DI to enable:
/// services.AddSingleton&lt;ITraceIdExtractor, MyAppTraceIdExtractor&gt;();
/// </summary>
public interface ITraceIdExtractor
{
/// <summary>
/// Try to extract a trace ID from the given event envelope.
/// </summary>
/// <param name="envelope">The parsed event envelope.</param>
/// <param name="traceId">The extracted trace ID, or null if not applicable.</param>
/// <returns>True if a trace ID was successfully extracted.</returns>
bool TryExtract(EventEnvelope envelope, out string? traceId);
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Linq;
using Aevatar.Agents.Abstractions;
using Google.Protobuf;
using MassTransit;
Expand Down Expand Up @@ -124,13 +125,27 @@ public Task<IMessageStreamSubscription> SubscribeAsync<T>(
};

_handlers.TryAdd(subscriptionId, wrapperHandler);

// Log with explicit format to show actual value (not Serilog-quoted)
_logger.LogInformation("[MassTransitMessageStream] Handler REGISTERED - StreamId='{StreamId}', StreamIdLength={Length}, SubscriptionId={SubscriptionId}, HandlerType={HandlerType}, TotalHandlers={Total}",
StreamId, StreamId?.Length ?? 0, subscriptionId, typeof(T).Name, _handlers.Count);

// Also log raw bytes to detect hidden characters
if (!string.IsNullOrEmpty(StreamId))
{
var bytes = System.Text.Encoding.UTF8.GetBytes(StreamId);
var hex = string.Join(" ", bytes.Take(50).Select(b => b.ToString("X2")));
_logger.LogDebug("[MassTransitMessageStream] StreamId raw bytes (first 50): {Hex}", hex);
}

return Task.FromResult<IMessageStreamSubscription>(
new MassTransitMessageStreamSubscription(
subscriptionId,
StreamId,
() => {
_handlers.TryRemove(subscriptionId, out _);
_logger.LogDebug("[MassTransitMessageStream] Handler UNREGISTERED - StreamId={StreamId}, SubscriptionId={SubscriptionId}, RemainingHandlers={Remaining}",
StreamId, subscriptionId, _handlers.Count);
return Task.CompletedTask;
}));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Linq;
using Aevatar.Agents.Abstractions;
using MassTransit;
using MassTransit.KafkaIntegration;
Expand Down Expand Up @@ -84,8 +85,32 @@ public IMessageStream GetStream(string agentId, string? category = null)
// A stream instance is tied to an AgentId.
// If we create it with a category, that category determines where it publishes TO.

return _streams.GetOrAdd(agentId, id =>
new MassTransitMessageStream(id, category, _bus, _serviceProvider, _options));
var logger = _serviceProvider.GetService<ILogger<MassTransitMessageStreamProvider>>();
var isNew = !_streams.ContainsKey(agentId);

var stream = _streams.GetOrAdd(agentId, id =>
{
logger?.LogInformation("[MassTransitMessageStreamProvider] Creating NEW stream - StreamId='{StreamId}', StreamIdLength={Length}, Category={Category}, TotalStreams={Total}",
id, id?.Length ?? 0, category ?? "null", _streams.Count + 1);

// Log raw bytes to detect hidden characters
if (!string.IsNullOrEmpty(id))
{
var bytes = System.Text.Encoding.UTF8.GetBytes(id);
var hex = string.Join(" ", bytes.Take(50).Select(b => b.ToString("X2")));
logger?.LogDebug("[MassTransitMessageStreamProvider] StreamId raw bytes (first 50): {Hex}", hex);
}

return new MassTransitMessageStream(id, category, _bus, _serviceProvider, _options);
});

if (!isNew)
{
logger?.LogDebug("[MassTransitMessageStreamProvider] Using EXISTING stream - StreamId={StreamId}, Category={Category}, TotalStreams={Total}",
agentId, category ?? "null", _streams.Count);
}

return stream;
}

/// <summary>
Expand All @@ -94,12 +119,61 @@ public IMessageStream GetStream(string agentId, string? category = null)
/// </summary>
internal MassTransitMessageStream? GetStreamInternal(string streamId)
{
_streams.TryGetValue(streamId, out var stream);
// Defensive: Try lookup with stripped quotes if direct lookup fails
var found = _streams.TryGetValue(streamId, out var stream);

if (!found && !string.IsNullOrEmpty(streamId))
{
// Try stripping quotes and lookup again
var stripped = streamId.Trim('"', '\'', '\u201C', '\u201D', ' ', '\t');
if (stripped.Length != streamId.Length)
{
found = _streams.TryGetValue(stripped, out stream);
if (found)
{
var logger = _serviceProvider.GetService<ILogger<MassTransitMessageStreamProvider>>();
logger?.LogDebug("[MassTransitMessageStreamProvider] Stream found after quote stripping: '{Original}' -> '{Stripped}'",
streamId, stripped);
}
}
}

if (!found)
{
var logger = _serviceProvider.GetService<ILogger<MassTransitMessageStreamProvider>>();
logger?.LogWarning("[MassTransitMessageStreamProvider] Stream NOT FOUND - StreamId='{StreamId}', TotalRegistered={Total}, RegisteredStreams=[{Streams}]",
streamId, _streams.Count, string.Join(", ", _streams.Keys.Take(10)));
}

return stream;
}

/// <summary>
/// Gets all registered stream IDs (for debugging).
/// </summary>
internal IEnumerable<string> GetAllStreamIds() => _streams.Keys;

/// <summary>
/// Fast check if there's a local subscriber for the given streamId.
/// Used by StreamMessageDispatcher for early filtering in broadcast mode.
/// This is O(1) lookup - no heavy processing.
/// </summary>
internal bool HasSubscriber(string streamId)
{
if (string.IsNullOrEmpty(streamId))
return false;

// Direct lookup
if (_streams.TryGetValue(streamId, out var stream) && stream.GetHandlerCount() > 0)
return true;

// Defensive: try with stripped quotes
var stripped = streamId.Trim('"', '\'', '\u201C', '\u201D', ' ', '\t');
if (stripped.Length != streamId.Length &&
_streams.TryGetValue(stripped, out stream) &&
stream.GetHandlerCount() > 0)
return true;

return false;
}
}
Loading