diff --git a/Blog.Core.Api/Startup.cs b/Blog.Core.Api/Startup.cs
index 36969a9..b2aaf7a 100644
--- a/Blog.Core.Api/Startup.cs
+++ b/Blog.Core.Api/Startup.cs
@@ -68,6 +68,7 @@ namespace Blog.Core
services.AddRedisInitMqSetup();
services.AddRabbitMQSetup();
+ services.AddKafkaSetup(Configuration);
services.AddEventBusSetup();
services.AddNacosSetup(Configuration);
diff --git a/Blog.Core.Api/appsettings.json b/Blog.Core.Api/appsettings.json
index 989b1d0..39c865d 100644
--- a/Blog.Core.Api/appsettings.json
+++ b/Blog.Core.Api/appsettings.json
@@ -31,6 +31,13 @@
"UserName": "",
"Password": "!",
"RetryCount": 3
+ },
+ "Kafka": {
+ "Enabled": false,
+ "Servers": "localhost:9092",
+ "Topic": "blog",
+ "GroupId": "blog-consumer",
+ "NumPartitions": 3 //主题分区数量
},
"EventBus": {
"Enabled": false,
diff --git a/Blog.Core.EventBus/Blog.Core.EventBus.csproj b/Blog.Core.EventBus/Blog.Core.EventBus.csproj
index d15a010..b71254a 100644
--- a/Blog.Core.EventBus/Blog.Core.EventBus.csproj
+++ b/Blog.Core.EventBus/Blog.Core.EventBus.csproj
@@ -8,11 +8,13 @@
+
+
diff --git a/Blog.Core.EventBus/EventBusKafka/EventBusKafka.cs b/Blog.Core.EventBus/EventBusKafka/EventBusKafka.cs
new file mode 100644
index 0000000..6b2dd0f
--- /dev/null
+++ b/Blog.Core.EventBus/EventBusKafka/EventBusKafka.cs
@@ -0,0 +1,118 @@
+using Blog.Core.Common.Extensions;
+using Confluent.Kafka;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Newtonsoft.Json;
+using System;
+
+namespace Blog.Core.EventBus
+{
+ ///
+ /// 基于Kafka的事件总线
+ ///
+ public class EventBusKafka : IEventBus
+ {
+ private readonly ILogger _logger;
+ private readonly IEventBusSubscriptionsManager _subsManager;
+ private readonly IKafkaConnectionPool _connectionPool;
+ private readonly KafkaOptions _options;
+ public EventBusKafka(ILogger logger,
+ IEventBusSubscriptionsManager subsManager,
+ IKafkaConnectionPool connectionPool,
+ IOptions options)
+ {
+ _logger = logger;
+ _subsManager = subsManager;
+ _connectionPool = connectionPool;
+ _options = options.Value;
+ }
+ ///
+ /// 发布
+ ///
+ public void Publish(IntegrationEvent @event)
+ {
+ var producer = _connectionPool.Producer();
+ try
+ {
+ var eventName = @event.GetType().Name;
+ var body = Protobuf.Serialize(JsonConvert.SerializeObject(@event));
+ DeliveryResult result = producer.ProduceAsync(_options.Topic, new Message
+ {
+ Key = eventName,
+ Value = body
+ }).ConfigureAwait(false).GetAwaiter().GetResult();
+ }
+ catch (Exception ex)
+ {
+ _logger.LogWarning($"Could not publish event: {@event.Id.ToString("N")} ({ex.Message}); Message:{ JsonConvert.SerializeObject(@event)}");
+ }
+ finally
+ {
+ //放入连接池中
+ _connectionPool.Return(producer);
+ }
+ }
+
+ ///
+ /// 订阅
+ /// 动态
+ ///
+ /// 事件处理器
+ /// 事件名
+ public void SubscribeDynamic(string eventName)
+ where TH : IDynamicIntegrationEventHandler
+ {
+ _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
+
+ _subsManager.AddDynamicSubscription | (eventName);
+ }
+
+ ///
+ /// 订阅
+ ///
+ /// 约束:事件模型
+ /// 约束:事件处理器<事件模型>
+ public void Subscribe()
+ where T : IntegrationEvent
+ where TH : IIntegrationEventHandler
+ {
+ var eventName = _subsManager.GetEventKey();
+
+ _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
+
+ _subsManager.AddSubscription();
+ }
+
+ ///
+ /// 取消订阅
+ ///
+ ///
+ ///
+ public void Unsubscribe()
+ where T : IntegrationEvent
+ where TH : IIntegrationEventHandler
+ {
+ var eventName = _subsManager.GetEventKey();
+
+ _logger.LogInformation("Unsubscribing from event {EventName}", eventName);
+
+ _subsManager.RemoveSubscription();
+ }
+
+ public void UnsubscribeDynamic(string eventName)
+ where TH : IDynamicIntegrationEventHandler
+ {
+ _subsManager.RemoveDynamicSubscription | (eventName);
+ }
+
+ public void Dispose()
+ {
+ if (_connectionPool != null)
+ {
+ _connectionPool.Dispose();
+ }
+ _subsManager.Clear();
+ }
+
+ }
+}
diff --git a/Blog.Core.EventBus/EventBusKafka/IKafkaConnectionPool.cs b/Blog.Core.EventBus/EventBusKafka/IKafkaConnectionPool.cs
new file mode 100644
index 0000000..0968e0e
--- /dev/null
+++ b/Blog.Core.EventBus/EventBusKafka/IKafkaConnectionPool.cs
@@ -0,0 +1,25 @@
+using Confluent.Kafka;
+using System;
+
+
+namespace Blog.Core.EventBus
+{
+ ///
+ /// Kafka连接池
+ ///
+ public interface IKafkaConnectionPool:IDisposable
+ {
+ ///
+ /// 取对象
+ ///
+ ///
+ IProducer Producer();
+
+ ///
+ /// 将对象放入连接池
+ ///
+ ///
+ ///
+ bool Return(IProducer producer);
+ }
+}
diff --git a/Blog.Core.EventBus/EventBusKafka/KafkaConnectionPool.cs b/Blog.Core.EventBus/EventBusKafka/KafkaConnectionPool.cs
new file mode 100644
index 0000000..addd1f6
--- /dev/null
+++ b/Blog.Core.EventBus/EventBusKafka/KafkaConnectionPool.cs
@@ -0,0 +1,79 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System.Collections.Concurrent;
+using System.Threading;
+
+
+namespace Blog.Core.EventBus
+{
+ ///
+ /// Kafka producer 连接池管理
+ /// 可以使用微软官方的对象池进行构造ObjectPool
+ ///
+ public class KafkaConnectionPool : IKafkaConnectionPool
+ {
+ private readonly KafkaOptions _options;
+ private ConcurrentQueue> _producerPool = new();
+ private int _currentCount;
+ private int _maxSize;
+ public KafkaConnectionPool(IOptions options)
+ {
+ _options = options.Value;
+ _maxSize = _options.ConnectionPoolSize;
+ }
+
+ ///
+ /// 取对象
+ ///
+ ///
+ public IProducer Producer()
+ {
+ if (_producerPool.TryDequeue(out var producer))
+ {
+ Interlocked.Decrement(ref _currentCount);
+ return producer;
+ }
+
+ var config = new ProducerConfig()
+ {
+ BootstrapServers = _options.Servers,
+ QueueBufferingMaxMessages = 10,
+ MessageTimeoutMs = 5000,
+ RequestTimeoutMs = 3000
+ };
+
+ producer = new ProducerBuilder(config)
+ .Build();
+ return producer;
+ }
+ ///
+ /// 将对象放入连接池
+ ///
+ ///
+ ///
+ public bool Return(IProducer producer)
+ {
+ if (Interlocked.Increment(ref _currentCount) <= _maxSize)
+ {
+ _producerPool.Enqueue(producer);
+ return true;
+ }
+
+ producer.Dispose();
+ Interlocked.Decrement(ref _currentCount);
+
+ return false;
+ }
+ public void Dispose()
+ {
+ _maxSize = 0;
+ _currentCount = 0;
+ while (_producerPool.TryDequeue(out var context))
+ {
+ context?.Dispose();
+ }
+ }
+
+ }
+}
diff --git a/Blog.Core.EventBus/EventBusKafka/KafkaConsumerHostService.cs b/Blog.Core.EventBus/EventBusKafka/KafkaConsumerHostService.cs
new file mode 100644
index 0000000..27fcd00
--- /dev/null
+++ b/Blog.Core.EventBus/EventBusKafka/KafkaConsumerHostService.cs
@@ -0,0 +1,162 @@
+using Autofac;
+using Confluent.Kafka;
+using Confluent.Kafka.Admin;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using Newtonsoft.Json;
+using Newtonsoft.Json.Linq;
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace Blog.Core.EventBus
+{
+ ///
+ /// Kafka consumer 监听服务
+ ///
+ public class KafkaConsumerHostService : BackgroundService
+ {
+ private readonly string AUTOFAC_SCOPE_NAME = "blogcore_event_bus";
+ private readonly ILogger _logger;
+ private readonly IConsumer _consumer;
+ private readonly KafkaOptions _options;
+ private readonly IEventBusSubscriptionsManager _subsManager;
+ private readonly ILifetimeScope _autofac;
+ private CancellationTokenSource cts = new();
+ public KafkaConsumerHostService(ILogger logger,
+ IOptions options,
+ IEventBusSubscriptionsManager eventBusSubscriptionsManager,
+ ILifetimeScope autofac)
+ {
+ _autofac = autofac;
+ _subsManager = eventBusSubscriptionsManager;
+ _logger = logger;
+ _options = options.Value;
+ _consumer = new ConsumerBuilder(new ConsumerConfig
+ {
+ BootstrapServers = _options.Servers,
+ GroupId = _options.GroupId,
+ AutoOffsetReset = AutoOffsetReset.Earliest,
+ AllowAutoCreateTopics = true,
+ EnableAutoCommit = false,
+ LogConnectionClose = false
+ }).SetErrorHandler(ConsumerClient_OnConsumeError)
+ .Build();
+ }
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ var result = await FetchTopicAsync();
+ if (result)
+ {
+ _consumer.Subscribe(_options.Topic);
+ while (!cts.Token.IsCancellationRequested)
+ {
+ var consumerResult = _consumer.Consume(cts.Token);
+ try
+ {
+ if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue;
+
+ var @event = Protobuf.Deserialize(consumerResult.Message.Value);
+ await ProcessEvent(consumerResult.Message.Key, @event);
+ }
+ catch (ConsumeException e)
+ {
+ _logger.LogError($"Error occured: {e.Error.Reason}");
+ }
+ finally
+ {
+ _consumer.Commit(consumerResult);
+ }
+ }
+ }
+ }
+ public override Task StopAsync(CancellationToken cancellationToken)
+ {
+ cts.Cancel();
+ _logger.LogInformation("kafka consumer stop and disposable");
+ _consumer.Dispose();
+ return base.StopAsync(cancellationToken);
+ }
+ ///
+ /// 检测当前Topic是否存在
+ ///
+ ///
+ private async Task FetchTopicAsync()
+ {
+ if (string.IsNullOrEmpty(_options.Topic))
+ throw new ArgumentNullException(nameof(_options.Topic));
+
+ try
+ {
+ var config = new AdminClientConfig { BootstrapServers = _options.Servers };
+ using var adminClient = new AdminClientBuilder(config).Build();
+ await adminClient.CreateTopicsAsync(Enumerable.Range(0,1).Select(u=> new TopicSpecification
+ {
+ Name = _options.Topic,
+ NumPartitions = _options.NumPartitions
+ }));
+ }
+ catch (CreateTopicsException ex) when (ex.Message.Contains("already exists"))
+ {
+ }
+ catch (Exception ex)
+ {
+ _logger.LogError("An error was encountered when automatically creating topic! -->" + ex.Message);
+ return false;
+ }
+ return true;
+ }
+ ///
+ /// 接收到消息进行处理
+ ///
+ /// 事件名称
+ /// 消息内容
+ ///
+ private async Task ProcessEvent(string eventName, string message)
+ {
+ _logger.LogTrace("Processing Kafka event: {EventName}", eventName);
+
+ if (_subsManager.HasSubscriptionsForEvent(eventName))
+ {
+ using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
+ {
+ var subscriptions = _subsManager.GetHandlersForEvent(eventName);
+ foreach (var subscription in subscriptions)
+ {
+ if (subscription.IsDynamic)
+ {
+ var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
+ if (handler == null) continue;
+ dynamic eventData = JObject.Parse(message);
+
+ await Task.Yield();
+ await handler.Handle(eventData);
+ }
+ else
+ {
+ var handler = scope.ResolveOptional(subscription.HandlerType);
+ if (handler == null) continue;
+ var eventType = _subsManager.GetEventTypeByName(eventName);
+ var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
+ var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
+
+ await Task.Yield();
+ await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
+ }
+ }
+ }
+ }
+ else
+ {
+ _logger.LogWarning("No subscription for Kafka event: {EventName}", eventName);
+ }
+ }
+
+ private void ConsumerClient_OnConsumeError(IConsumer consumer, Error e)
+ {
+ _logger.LogError("An error occurred during connect kafka:" + e.Reason);
+ }
+ }
+}
diff --git a/Blog.Core.EventBus/EventBusKafka/KafkaOptions.cs b/Blog.Core.EventBus/EventBusKafka/KafkaOptions.cs
new file mode 100644
index 0000000..637da2a
--- /dev/null
+++ b/Blog.Core.EventBus/EventBusKafka/KafkaOptions.cs
@@ -0,0 +1,28 @@
+
+
+namespace Blog.Core.EventBus
+{
+ ///
+ /// Kafka 配置项
+ ///
+ public class KafkaOptions
+ {
+ public int ConnectionPoolSize { get; set; } = 10;
+ ///
+ /// 地址
+ ///
+ public string Servers { get; set; }
+ ///
+ /// 主题
+ ///
+ public string Topic { get; set; }
+ ///
+ /// 消费者组Id
+ ///
+ public string GroupId { get; set; }
+ ///
+ /// 主题分区
+ ///
+ public int NumPartitions { get; set; }
+ }
+}
diff --git a/Blog.Core.EventBus/EventBusKafka/ProtobufTransfer.cs b/Blog.Core.EventBus/EventBusKafka/ProtobufTransfer.cs
new file mode 100644
index 0000000..c365db1
--- /dev/null
+++ b/Blog.Core.EventBus/EventBusKafka/ProtobufTransfer.cs
@@ -0,0 +1,32 @@
+using System;
+using System.IO;
+namespace Blog.Core.EventBus
+{
+ public class Protobuf
+ {
+ ///
+ /// Protobuf 反序列化
+ ///
+ public static T Deserialize(ReadOnlySpan data)
+ {
+ Stream stream = new MemoryStream(data.ToArray());
+ var info = ProtoBuf.Serializer.Deserialize(stream);
+ return info;
+ }
+ ///
+ /// 通过Protobuf 转字节
+ ///
+ public static byte[] Serialize(T data)
+ {
+ byte[] datas;
+ using (var stream = new MemoryStream())
+ {
+ ProtoBuf.Serializer.Serialize(stream, data);
+ datas = stream.ToArray();
+ }
+ return datas;
+
+
+ }
+ }
+}
diff --git a/Blog.Core.Extensions/ServiceExtensions/EventBusSetup.cs b/Blog.Core.Extensions/ServiceExtensions/EventBusSetup.cs
index 2d74c3e..197803c 100644
--- a/Blog.Core.Extensions/ServiceExtensions/EventBusSetup.cs
+++ b/Blog.Core.Extensions/ServiceExtensions/EventBusSetup.cs
@@ -18,41 +18,47 @@ namespace Blog.Core.Extensions
{
if (services == null) throw new ArgumentNullException(nameof(services));
- if (Appsettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool() && Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool())
+ if (Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool())
{
var subscriptionClientName = Appsettings.app(new string[] { "EventBus", "SubscriptionClientName" });
-
services.AddSingleton();
services.AddTransient();
-
- services.AddSingleton(sp =>
+ if (Appsettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool())
{
- var rabbitMQPersistentConnection = sp.GetRequiredService();
- var iLifetimeScope = sp.GetRequiredService();
- var logger = sp.GetRequiredService>();
- var eventBusSubcriptionsManager = sp.GetRequiredService();
-
- var retryCount = 5;
- if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" })))
+ services.AddSingleton(sp =>
{
- retryCount = int.Parse(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" }));
- }
+ var rabbitMQPersistentConnection = sp.GetRequiredService();
+ var iLifetimeScope = sp.GetRequiredService();
+ var logger = sp.GetRequiredService>();
+ var eventBusSubcriptionsManager = sp.GetRequiredService();
- return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
- });
+ var retryCount = 5;
+ if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" })))
+ {
+ retryCount = int.Parse(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" }));
+ }
+
+ return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
+ });
+ }
+ if(Appsettings.app(new string[] { "Kafka", "Enabled" }).ObjToBool())
+ {
+ services.AddHostedService();
+ services.AddSingleton();
+ }
}
}
public static void ConfigureEventBus(this IApplicationBuilder app)
{
- if (Appsettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool() && Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool())
+ if (Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool())
{
var eventBus = app.ApplicationServices.GetRequiredService();
- eventBus.Subscribe();
+ eventBus.Subscribe();
}
}
}
diff --git a/Blog.Core.Extensions/ServiceExtensions/KafkaSetup.cs b/Blog.Core.Extensions/ServiceExtensions/KafkaSetup.cs
new file mode 100644
index 0000000..79e60b9
--- /dev/null
+++ b/Blog.Core.Extensions/ServiceExtensions/KafkaSetup.cs
@@ -0,0 +1,26 @@
+using Blog.Core.Common;
+using Blog.Core.EventBus;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using System;
+
+namespace Blog.Core.Extensions
+{
+ ///
+ /// 注入Kafka相关配置
+ ///
+ public static class KafkaSetup
+ {
+ public static void AddKafkaSetup(this IServiceCollection services,IConfiguration configuration)
+ {
+ if (services == null) throw new ArgumentNullException(nameof(services));
+
+ if (Appsettings.app(new string[] { "Kafka", "Enabled" }).ObjToBool())
+ {
+ services.Configure(configuration.GetSection("kafka"));
+ services.AddSingleton();
+ }
+ }
+ }
+}
| |