Skip to content

Commit 24df21f

Browse files
committed
Adding kafka
1 parent d1209db commit 24df21f

18 files changed

Lines changed: 422 additions & 27 deletions
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
name: Deploy Event Bus Kafka NuGet
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
workflow_dispatch:
8+
9+
jobs:
10+
build-and-push:
11+
runs-on: ubuntu-latest
12+
steps:
13+
- name: Checkout repository
14+
uses: actions/checkout@v4
15+
16+
- name: Setup .NET (use global.json)
17+
uses: actions/setup-dotnet@v4
18+
19+
- name: Restore dependencies
20+
run: dotnet restore **/CodeChavez.EventBus.Kafka/CodeChavez.EventBus.Kafka.csproj --configfile nuget.config --no-cache
21+
22+
- name: Build project
23+
run: dotnet build **/CodeChavez.EventBus.Kafka/CodeChavez.EventBus.Kafka.csproj --configuration Release
24+
25+
- name: Package Common
26+
run: dotnet pack **/CodeChavez.EventBus.AbstractKafkaions/CodeChavez.EventBus.Kafka.csproj --configuration Release --no-build --output ./artifacts2
27+
28+
- name: Push NuGet package
29+
run: dotnet nuget push "./artifacts2/*.nupkg" --api-key $NUGET_AUTH_TOKEN --source https://api.nuget.org/v3/index.json
30+
env:
31+
NUGET_AUTH_TOKEN: ${{ secrets.NUGET_API_KEY }}

CodeChavez.EventBus.Abstractions.sln

Lines changed: 0 additions & 25 deletions
This file was deleted.

CodeChavez.EventBus.sln

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
2+
Microsoft Visual Studio Solution File, Format Version 12.00
3+
# Visual Studio Version 17
4+
VisualStudioVersion = 17.14.36301.6
5+
MinimumVisualStudioVersion = 10.0.40219.1
6+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CodeChavez.EventBus.Abstractions", "src\CodeChavez.EventBus.Abstractions\CodeChavez.EventBus.Abstractions.csproj", "{54618F5E-5C1B-4FE4-A4D8-C1CB2B10BB5D}"
7+
EndProject
8+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CodeChavez.EventBus.Kafka", "src\CodeChavez.EventBus.Kafka\CodeChavez.EventBus.Kafka.csproj", "{E7157B25-9CE2-4B4B-B024-333BF200CA14}"
9+
EndProject
10+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CodeChavez.EventBus.RabbitMQ", "src\CodeChavez.EventBus.RabbitMQ\CodeChavez.EventBus.RabbitMQ.csproj", "{3F1A21B6-FD4E-4493-A25C-47E80B3822B2}"
11+
EndProject
12+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CodeChavez.EventBus.MQTT", "src\CodeChavez.EventBus.MQTT\CodeChavez.EventBus.MQTT.csproj", "{8B56FBC1-440C-4479-A212-FB25F8251F0C}"
13+
EndProject
14+
Global
15+
GlobalSection(SolutionConfigurationPlatforms) = preSolution
16+
Debug|Any CPU = Debug|Any CPU
17+
Release|Any CPU = Release|Any CPU
18+
EndGlobalSection
19+
GlobalSection(ProjectConfigurationPlatforms) = postSolution
20+
{54618F5E-5C1B-4FE4-A4D8-C1CB2B10BB5D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
21+
{54618F5E-5C1B-4FE4-A4D8-C1CB2B10BB5D}.Debug|Any CPU.Build.0 = Debug|Any CPU
22+
{54618F5E-5C1B-4FE4-A4D8-C1CB2B10BB5D}.Release|Any CPU.ActiveCfg = Release|Any CPU
23+
{54618F5E-5C1B-4FE4-A4D8-C1CB2B10BB5D}.Release|Any CPU.Build.0 = Release|Any CPU
24+
{E7157B25-9CE2-4B4B-B024-333BF200CA14}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
25+
{E7157B25-9CE2-4B4B-B024-333BF200CA14}.Debug|Any CPU.Build.0 = Debug|Any CPU
26+
{E7157B25-9CE2-4B4B-B024-333BF200CA14}.Release|Any CPU.ActiveCfg = Release|Any CPU
27+
{E7157B25-9CE2-4B4B-B024-333BF200CA14}.Release|Any CPU.Build.0 = Release|Any CPU
28+
{3F1A21B6-FD4E-4493-A25C-47E80B3822B2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
29+
{3F1A21B6-FD4E-4493-A25C-47E80B3822B2}.Debug|Any CPU.Build.0 = Debug|Any CPU
30+
{3F1A21B6-FD4E-4493-A25C-47E80B3822B2}.Release|Any CPU.ActiveCfg = Release|Any CPU
31+
{3F1A21B6-FD4E-4493-A25C-47E80B3822B2}.Release|Any CPU.Build.0 = Release|Any CPU
32+
{8B56FBC1-440C-4479-A212-FB25F8251F0C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
33+
{8B56FBC1-440C-4479-A212-FB25F8251F0C}.Debug|Any CPU.Build.0 = Debug|Any CPU
34+
{8B56FBC1-440C-4479-A212-FB25F8251F0C}.Release|Any CPU.ActiveCfg = Release|Any CPU
35+
{8B56FBC1-440C-4479-A212-FB25F8251F0C}.Release|Any CPU.Build.0 = Release|Any CPU
36+
EndGlobalSection
37+
GlobalSection(SolutionProperties) = preSolution
38+
HideSolutionNode = FALSE
39+
EndGlobalSection
40+
GlobalSection(ExtensibilityGlobals) = postSolution
41+
SolutionGuid = {4EC7ECDF-0ACB-40A3-A27F-3599FBD516EA}
42+
EndGlobalSection
43+
EndGlobal

src/CodeChavez.EventBus.Abstractions/CodeChavez.EventBus.Abstractions.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<Title>CodeChavez EventBus Abstractions Library</Title>
1010
<Authors>Victor A Chavez</Authors>
1111
<Product>CodeChavez.EventBus.Abstractions</Product>
12-
<Version>9.0.0</Version>
12+
<Version>9.0.1</Version>
1313
<Copyright>2020-2025</Copyright>
1414
<Description>It has all necessary interfaces to be implemented</Description>
1515
<PackageReadmeFile>README.md</PackageReadmeFile>
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
namespace CodeChavez.EventBus.Abstractions.Interfaces;
2+
3+
public interface IConsumerProducerOptions
4+
{
5+
List<string> Servers { get; set; }
6+
7+
string Group { get; set; }
8+
9+
string Client { get; set; }
10+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
namespace CodeChavez.EventBus.Abstractions.Interfaces;
2+
3+
public interface IEventBusProducer
4+
{
5+
Task PublishAsync<TEvent>(TEvent @event, string? topic = null, CancellationToken cancellationToken = default)
6+
where TEvent : class;
7+
}

src/CodeChavez.EventBus.Abstractions/Interfaces/IIntegrationEventHandler.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,3 @@ public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEv
88
}
99

1010
public interface IIntegrationEventHandler { }
11-
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>net9.0</TargetFramework>
5+
<ImplicitUsings>enable</ImplicitUsings>
6+
<Nullable>enable</Nullable>
7+
<GeneratePackageOnBuild>True</GeneratePackageOnBuild>
8+
<PackageId>CodeChavez.EventBus.Kafka</PackageId>
9+
<Title>CodeChavez EventBus Kafka Library</Title>
10+
<Authors>Victor A Chavez</Authors>
11+
<Product>CodeChavez.EventBus.Kafka</Product>
12+
<Version>9.0.1</Version>
13+
<Copyright>2020-2025</Copyright>
14+
<Description>Stuff to connect to Kafka as a consumer or producer plus subcriptions</Description>
15+
<PackageReadmeFile>README.md</PackageReadmeFile>
16+
<PackageProjectUrl>https://github.com/codechavez/CodeChavez.EventBus</PackageProjectUrl>
17+
<RepositoryUrl>https://github.com/codechavez/CodeChavez.EventBus</RepositoryUrl>
18+
<PackageLicenseFile>LICENSE</PackageLicenseFile>
19+
<PackageTags>codechavez,event,bus,events,consumer,producer,helpers,utilities,.net</PackageTags>
20+
</PropertyGroup>
21+
22+
<ItemGroup>
23+
<None Include="..\..\README.md" Pack="true" PackagePath="\"/>
24+
<None Include="..\..\LICENSE" Pack="true" PackagePath="\"/>
25+
</ItemGroup>
26+
27+
<ItemGroup>
28+
<PackageReference Include="Confluent.Kafka" Version="2.11.1" />
29+
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.7" />
30+
</ItemGroup>
31+
32+
<ItemGroup>
33+
<ProjectReference Include="..\CodeChavez.EventBus.Abstractions\CodeChavez.EventBus.Abstractions.csproj" />
34+
</ItemGroup>
35+
36+
</Project>
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
using Confluent.Kafka;
2+
3+
namespace CodeChavez.EventBus.Kafka.Consumers;
4+
5+
public class ConsumerOptions : ConsumerConfig
6+
{
7+
8+
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
using Confluent.Kafka;
2+
using MediatR;
3+
using Microsoft.Extensions.Configuration;
4+
using Microsoft.Extensions.DependencyInjection;
5+
using Microsoft.Extensions.Hosting;
6+
using Microsoft.Extensions.Logging;
7+
using System.Text.Json;
8+
9+
namespace CodeChavez.EventBus.Kafka.Consumers;
10+
11+
public class MultiplexConsumer<TEvent> : BackgroundService where TEvent : class, INotification
12+
{
13+
private readonly string _topic;
14+
private readonly IServiceScopeFactory _scopeFactory;
15+
private readonly IConsumer<string, string> _consumer;
16+
private readonly ILogger<MultiplexConsumer<TEvent>> _logger;
17+
18+
private readonly int _batchSize;
19+
private readonly TimeSpan _pollTimeout;
20+
private readonly int _maxParallelism;
21+
22+
public MultiplexConsumer(
23+
string topic,
24+
ConsumerOptions config,
25+
IServiceScopeFactory scopeFactory,
26+
ILogger<MultiplexConsumer<TEvent>> logger,
27+
IConfiguration configuration
28+
)
29+
{
30+
_topic = topic;
31+
_scopeFactory = scopeFactory;
32+
_consumer = new ConsumerBuilder<string, string>(config).Build();
33+
_logger = logger;
34+
35+
_batchSize = configuration.GetValue<int>("EVENTBUS:Consumer:MaxConcurrency");
36+
_maxParallelism = configuration.GetValue<int>("EVENTBUS:Consumer:MaxParalleism");
37+
_pollTimeout = TimeSpan.FromMicroseconds(configuration.GetValue<int>("EVENTBUS:Consumer:PollTimeout"));
38+
}
39+
40+
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
41+
{
42+
_logger.LogInformation($"Listing for {_topic}");
43+
_consumer.Subscribe(_topic);
44+
45+
using var scope = _scopeFactory.CreateScope();
46+
var _mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
47+
48+
while (!cancellationToken.IsCancellationRequested)
49+
{
50+
try
51+
{
52+
var messages = new List<ConsumeResult<string, string>>(_batchSize);
53+
while (messages.Count < _batchSize)
54+
{
55+
var message = _consumer.Consume(_pollTimeout);
56+
if (message != null)
57+
{
58+
messages.Add(message);
59+
}
60+
else
61+
{
62+
break; // No more messages this cycle
63+
}
64+
}
65+
66+
if (messages.Count == 0)
67+
continue;
68+
69+
await Parallel.ForEachAsync(messages, new ParallelOptions
70+
{
71+
MaxDegreeOfParallelism = _maxParallelism,
72+
CancellationToken = cancellationToken
73+
},
74+
async (kafkaMessage, token) =>
75+
{
76+
try
77+
{
78+
var kafkaValue = kafkaMessage.Message.Value;
79+
var @event = JsonSerializer.Deserialize<TEvent>(kafkaValue);
80+
81+
if (@event is null)
82+
return;
83+
84+
using var scope = _scopeFactory.CreateScope();
85+
var mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
86+
87+
await mediator.Publish(@event, cancellationToken);
88+
}
89+
catch (OperationCanceledException) { /* normal during shutdown */ }
90+
catch (Exception ex)
91+
{
92+
_logger.LogError($"⚠️ Error processing message: {ex.Message}");
93+
}
94+
});
95+
}
96+
catch (OperationCanceledException) { /* shutting down */ }
97+
catch (Exception ex)
98+
{
99+
_logger.LogError($"❌ Kafka error on topic {_topic}: {ex.Message}");
100+
}
101+
}
102+
}
103+
104+
public override void Dispose()
105+
{
106+
_consumer.Close();
107+
_consumer.Dispose();
108+
}
109+
}

0 commit comments

Comments
 (0)