From e9248f5ecc62107ca5d912f59fb022e10885aa40 Mon Sep 17 00:00:00 2001 From: 867824092 Date: Mon, 23 Aug 2021 21:34:37 +0800 Subject: [PATCH] Kafka message communication --- Blog.Core.Api/Startup.cs | 1 + Blog.Core.Api/appsettings.json | 7 + Blog.Core.EventBus/Blog.Core.EventBus.csproj | 2 + .../EventBusKafka/EventBusKafka.cs | 118 +++++++++++++ .../EventBusKafka/IKafkaConnectionPool.cs | 25 +++ .../EventBusKafka/KafkaConnectionPool.cs | 79 +++++++++ .../EventBusKafka/KafkaConsumerHostService.cs | 162 ++++++++++++++++++ .../EventBusKafka/KafkaOptions.cs | 28 +++ .../EventBusKafka/ProtobufTransfer.cs | 32 ++++ .../ServiceExtensions/EventBusSetup.cs | 40 +++-- .../ServiceExtensions/KafkaSetup.cs | 26 +++ 11 files changed, 503 insertions(+), 17 deletions(-) create mode 100644 Blog.Core.EventBus/EventBusKafka/EventBusKafka.cs create mode 100644 Blog.Core.EventBus/EventBusKafka/IKafkaConnectionPool.cs create mode 100644 Blog.Core.EventBus/EventBusKafka/KafkaConnectionPool.cs create mode 100644 Blog.Core.EventBus/EventBusKafka/KafkaConsumerHostService.cs create mode 100644 Blog.Core.EventBus/EventBusKafka/KafkaOptions.cs create mode 100644 Blog.Core.EventBus/EventBusKafka/ProtobufTransfer.cs create mode 100644 Blog.Core.Extensions/ServiceExtensions/KafkaSetup.cs diff --git a/Blog.Core.Api/Startup.cs b/Blog.Core.Api/Startup.cs index 36969a9..b2aaf7a 100644 --- a/Blog.Core.Api/Startup.cs +++ b/Blog.Core.Api/Startup.cs @@ -68,6 +68,7 @@ namespace Blog.Core services.AddRedisInitMqSetup(); services.AddRabbitMQSetup(); + services.AddKafkaSetup(Configuration); services.AddEventBusSetup(); services.AddNacosSetup(Configuration); diff --git a/Blog.Core.Api/appsettings.json b/Blog.Core.Api/appsettings.json index 989b1d0..39c865d 100644 --- a/Blog.Core.Api/appsettings.json +++ b/Blog.Core.Api/appsettings.json @@ -31,6 +31,13 @@ "UserName": "", "Password": "!", "RetryCount": 3 + }, + "Kafka": { + "Enabled": false, + "Servers": "localhost:9092", + "Topic": "blog", + "GroupId": "blog-consumer", + "NumPartitions": 3 //主题分区数量 }, "EventBus": { "Enabled": false, diff --git a/Blog.Core.EventBus/Blog.Core.EventBus.csproj b/Blog.Core.EventBus/Blog.Core.EventBus.csproj index d15a010..b71254a 100644 --- a/Blog.Core.EventBus/Blog.Core.EventBus.csproj +++ b/Blog.Core.EventBus/Blog.Core.EventBus.csproj @@ -8,11 +8,13 @@ + + diff --git a/Blog.Core.EventBus/EventBusKafka/EventBusKafka.cs b/Blog.Core.EventBus/EventBusKafka/EventBusKafka.cs new file mode 100644 index 0000000..6b2dd0f --- /dev/null +++ b/Blog.Core.EventBus/EventBusKafka/EventBusKafka.cs @@ -0,0 +1,118 @@ +using Blog.Core.Common.Extensions; +using Confluent.Kafka; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using System; + +namespace Blog.Core.EventBus +{ + /// + /// 基于Kafka的事件总线 + /// + public class EventBusKafka : IEventBus + { + private readonly ILogger _logger; + private readonly IEventBusSubscriptionsManager _subsManager; + private readonly IKafkaConnectionPool _connectionPool; + private readonly KafkaOptions _options; + public EventBusKafka(ILogger logger, + IEventBusSubscriptionsManager subsManager, + IKafkaConnectionPool connectionPool, + IOptions options) + { + _logger = logger; + _subsManager = subsManager; + _connectionPool = connectionPool; + _options = options.Value; + } + /// + /// 发布 + /// + public void Publish(IntegrationEvent @event) + { + var producer = _connectionPool.Producer(); + try + { + var eventName = @event.GetType().Name; + var body = Protobuf.Serialize(JsonConvert.SerializeObject(@event)); + DeliveryResult result = producer.ProduceAsync(_options.Topic, new Message + { + Key = eventName, + Value = body + }).ConfigureAwait(false).GetAwaiter().GetResult(); + } + catch (Exception ex) + { + _logger.LogWarning($"Could not publish event: {@event.Id.ToString("N")} ({ex.Message}); Message:{ JsonConvert.SerializeObject(@event)}"); + } + finally + { + //放入连接池中 + _connectionPool.Return(producer); + } + } + + /// + /// 订阅 + /// 动态 + /// + /// 事件处理器 + /// 事件名 + public void SubscribeDynamic(string eventName) + where TH : IDynamicIntegrationEventHandler + { + _logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); + + _subsManager.AddDynamicSubscription(eventName); + } + + /// + /// 订阅 + /// + /// 约束:事件模型 + /// 约束:事件处理器<事件模型> + public void Subscribe() + where T : IntegrationEvent + where TH : IIntegrationEventHandler + { + var eventName = _subsManager.GetEventKey(); + + _logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName()); + + _subsManager.AddSubscription(); + } + + /// + /// 取消订阅 + /// + /// + /// + public void Unsubscribe() + where T : IntegrationEvent + where TH : IIntegrationEventHandler + { + var eventName = _subsManager.GetEventKey(); + + _logger.LogInformation("Unsubscribing from event {EventName}", eventName); + + _subsManager.RemoveSubscription(); + } + + public void UnsubscribeDynamic(string eventName) + where TH : IDynamicIntegrationEventHandler + { + _subsManager.RemoveDynamicSubscription(eventName); + } + + public void Dispose() + { + if (_connectionPool != null) + { + _connectionPool.Dispose(); + } + _subsManager.Clear(); + } + + } +} diff --git a/Blog.Core.EventBus/EventBusKafka/IKafkaConnectionPool.cs b/Blog.Core.EventBus/EventBusKafka/IKafkaConnectionPool.cs new file mode 100644 index 0000000..0968e0e --- /dev/null +++ b/Blog.Core.EventBus/EventBusKafka/IKafkaConnectionPool.cs @@ -0,0 +1,25 @@ +using Confluent.Kafka; +using System; + + +namespace Blog.Core.EventBus +{ + /// + /// Kafka连接池 + /// + public interface IKafkaConnectionPool:IDisposable + { + /// + /// 取对象 + /// + /// + IProducer Producer(); + + /// + /// 将对象放入连接池 + /// + /// + /// + bool Return(IProducer producer); + } +} diff --git a/Blog.Core.EventBus/EventBusKafka/KafkaConnectionPool.cs b/Blog.Core.EventBus/EventBusKafka/KafkaConnectionPool.cs new file mode 100644 index 0000000..addd1f6 --- /dev/null +++ b/Blog.Core.EventBus/EventBusKafka/KafkaConnectionPool.cs @@ -0,0 +1,79 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System.Collections.Concurrent; +using System.Threading; + + +namespace Blog.Core.EventBus +{ + /// + /// Kafka producer 连接池管理 + /// 可以使用微软官方的对象池进行构造ObjectPool + /// + public class KafkaConnectionPool : IKafkaConnectionPool + { + private readonly KafkaOptions _options; + private ConcurrentQueue> _producerPool = new(); + private int _currentCount; + private int _maxSize; + public KafkaConnectionPool(IOptions options) + { + _options = options.Value; + _maxSize = _options.ConnectionPoolSize; + } + + /// + /// 取对象 + /// + /// + public IProducer Producer() + { + if (_producerPool.TryDequeue(out var producer)) + { + Interlocked.Decrement(ref _currentCount); + return producer; + } + + var config = new ProducerConfig() + { + BootstrapServers = _options.Servers, + QueueBufferingMaxMessages = 10, + MessageTimeoutMs = 5000, + RequestTimeoutMs = 3000 + }; + + producer = new ProducerBuilder(config) + .Build(); + return producer; + } + /// + /// 将对象放入连接池 + /// + /// + /// + public bool Return(IProducer producer) + { + if (Interlocked.Increment(ref _currentCount) <= _maxSize) + { + _producerPool.Enqueue(producer); + return true; + } + + producer.Dispose(); + Interlocked.Decrement(ref _currentCount); + + return false; + } + public void Dispose() + { + _maxSize = 0; + _currentCount = 0; + while (_producerPool.TryDequeue(out var context)) + { + context?.Dispose(); + } + } + + } +} diff --git a/Blog.Core.EventBus/EventBusKafka/KafkaConsumerHostService.cs b/Blog.Core.EventBus/EventBusKafka/KafkaConsumerHostService.cs new file mode 100644 index 0000000..27fcd00 --- /dev/null +++ b/Blog.Core.EventBus/EventBusKafka/KafkaConsumerHostService.cs @@ -0,0 +1,162 @@ +using Autofac; +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace Blog.Core.EventBus +{ + /// + /// Kafka consumer 监听服务 + /// + public class KafkaConsumerHostService : BackgroundService + { + private readonly string AUTOFAC_SCOPE_NAME = "blogcore_event_bus"; + private readonly ILogger _logger; + private readonly IConsumer _consumer; + private readonly KafkaOptions _options; + private readonly IEventBusSubscriptionsManager _subsManager; + private readonly ILifetimeScope _autofac; + private CancellationTokenSource cts = new(); + public KafkaConsumerHostService(ILogger logger, + IOptions options, + IEventBusSubscriptionsManager eventBusSubscriptionsManager, + ILifetimeScope autofac) + { + _autofac = autofac; + _subsManager = eventBusSubscriptionsManager; + _logger = logger; + _options = options.Value; + _consumer = new ConsumerBuilder(new ConsumerConfig + { + BootstrapServers = _options.Servers, + GroupId = _options.GroupId, + AutoOffsetReset = AutoOffsetReset.Earliest, + AllowAutoCreateTopics = true, + EnableAutoCommit = false, + LogConnectionClose = false + }).SetErrorHandler(ConsumerClient_OnConsumeError) + .Build(); + } + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + var result = await FetchTopicAsync(); + if (result) + { + _consumer.Subscribe(_options.Topic); + while (!cts.Token.IsCancellationRequested) + { + var consumerResult = _consumer.Consume(cts.Token); + try + { + if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue; + + var @event = Protobuf.Deserialize(consumerResult.Message.Value); + await ProcessEvent(consumerResult.Message.Key, @event); + } + catch (ConsumeException e) + { + _logger.LogError($"Error occured: {e.Error.Reason}"); + } + finally + { + _consumer.Commit(consumerResult); + } + } + } + } + public override Task StopAsync(CancellationToken cancellationToken) + { + cts.Cancel(); + _logger.LogInformation("kafka consumer stop and disposable"); + _consumer.Dispose(); + return base.StopAsync(cancellationToken); + } + /// + /// 检测当前Topic是否存在 + /// + /// + private async Task FetchTopicAsync() + { + if (string.IsNullOrEmpty(_options.Topic)) + throw new ArgumentNullException(nameof(_options.Topic)); + + try + { + var config = new AdminClientConfig { BootstrapServers = _options.Servers }; + using var adminClient = new AdminClientBuilder(config).Build(); + await adminClient.CreateTopicsAsync(Enumerable.Range(0,1).Select(u=> new TopicSpecification + { + Name = _options.Topic, + NumPartitions = _options.NumPartitions + })); + } + catch (CreateTopicsException ex) when (ex.Message.Contains("already exists")) + { + } + catch (Exception ex) + { + _logger.LogError("An error was encountered when automatically creating topic! -->" + ex.Message); + return false; + } + return true; + } + /// + /// 接收到消息进行处理 + /// + /// 事件名称 + /// 消息内容 + /// + private async Task ProcessEvent(string eventName, string message) + { + _logger.LogTrace("Processing Kafka event: {EventName}", eventName); + + if (_subsManager.HasSubscriptionsForEvent(eventName)) + { + using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)) + { + var subscriptions = _subsManager.GetHandlersForEvent(eventName); + foreach (var subscription in subscriptions) + { + if (subscription.IsDynamic) + { + var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler; + if (handler == null) continue; + dynamic eventData = JObject.Parse(message); + + await Task.Yield(); + await handler.Handle(eventData); + } + else + { + var handler = scope.ResolveOptional(subscription.HandlerType); + if (handler == null) continue; + var eventType = _subsManager.GetEventTypeByName(eventName); + var integrationEvent = JsonConvert.DeserializeObject(message, eventType); + var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType); + + await Task.Yield(); + await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent }); + } + } + } + } + else + { + _logger.LogWarning("No subscription for Kafka event: {EventName}", eventName); + } + } + + private void ConsumerClient_OnConsumeError(IConsumer consumer, Error e) + { + _logger.LogError("An error occurred during connect kafka:" + e.Reason); + } + } +} diff --git a/Blog.Core.EventBus/EventBusKafka/KafkaOptions.cs b/Blog.Core.EventBus/EventBusKafka/KafkaOptions.cs new file mode 100644 index 0000000..637da2a --- /dev/null +++ b/Blog.Core.EventBus/EventBusKafka/KafkaOptions.cs @@ -0,0 +1,28 @@ + + +namespace Blog.Core.EventBus +{ + /// + /// Kafka 配置项 + /// + public class KafkaOptions + { + public int ConnectionPoolSize { get; set; } = 10; + /// + /// 地址 + /// + public string Servers { get; set; } + /// + /// 主题 + /// + public string Topic { get; set; } + /// + /// 消费者组Id + /// + public string GroupId { get; set; } + /// + /// 主题分区 + /// + public int NumPartitions { get; set; } + } +} diff --git a/Blog.Core.EventBus/EventBusKafka/ProtobufTransfer.cs b/Blog.Core.EventBus/EventBusKafka/ProtobufTransfer.cs new file mode 100644 index 0000000..c365db1 --- /dev/null +++ b/Blog.Core.EventBus/EventBusKafka/ProtobufTransfer.cs @@ -0,0 +1,32 @@ +using System; +using System.IO; +namespace Blog.Core.EventBus +{ + public class Protobuf + { + /// + /// Protobuf 反序列化 + /// + public static T Deserialize(ReadOnlySpan data) + { + Stream stream = new MemoryStream(data.ToArray()); + var info = ProtoBuf.Serializer.Deserialize(stream); + return info; + } + /// + /// 通过Protobuf 转字节 + /// + public static byte[] Serialize(T data) + { + byte[] datas; + using (var stream = new MemoryStream()) + { + ProtoBuf.Serializer.Serialize(stream, data); + datas = stream.ToArray(); + } + return datas; + + + } + } +} diff --git a/Blog.Core.Extensions/ServiceExtensions/EventBusSetup.cs b/Blog.Core.Extensions/ServiceExtensions/EventBusSetup.cs index 2d74c3e..197803c 100644 --- a/Blog.Core.Extensions/ServiceExtensions/EventBusSetup.cs +++ b/Blog.Core.Extensions/ServiceExtensions/EventBusSetup.cs @@ -18,41 +18,47 @@ namespace Blog.Core.Extensions { if (services == null) throw new ArgumentNullException(nameof(services)); - if (Appsettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool() && Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool()) + if (Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool()) { var subscriptionClientName = Appsettings.app(new string[] { "EventBus", "SubscriptionClientName" }); - services.AddSingleton(); services.AddTransient(); - - services.AddSingleton(sp => + if (Appsettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool()) { - var rabbitMQPersistentConnection = sp.GetRequiredService(); - var iLifetimeScope = sp.GetRequiredService(); - var logger = sp.GetRequiredService>(); - var eventBusSubcriptionsManager = sp.GetRequiredService(); - - var retryCount = 5; - if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" }))) + services.AddSingleton(sp => { - retryCount = int.Parse(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" })); - } + var rabbitMQPersistentConnection = sp.GetRequiredService(); + var iLifetimeScope = sp.GetRequiredService(); + var logger = sp.GetRequiredService>(); + var eventBusSubcriptionsManager = sp.GetRequiredService(); - return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); - }); + var retryCount = 5; + if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" }))) + { + retryCount = int.Parse(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" })); + } + + return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); + }); + } + if(Appsettings.app(new string[] { "Kafka", "Enabled" }).ObjToBool()) + { + services.AddHostedService(); + services.AddSingleton(); + } } } public static void ConfigureEventBus(this IApplicationBuilder app) { - if (Appsettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool() && Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool()) + if (Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool()) { var eventBus = app.ApplicationServices.GetRequiredService(); - eventBus.Subscribe(); + eventBus.Subscribe(); } } } diff --git a/Blog.Core.Extensions/ServiceExtensions/KafkaSetup.cs b/Blog.Core.Extensions/ServiceExtensions/KafkaSetup.cs new file mode 100644 index 0000000..79e60b9 --- /dev/null +++ b/Blog.Core.Extensions/ServiceExtensions/KafkaSetup.cs @@ -0,0 +1,26 @@ +using Blog.Core.Common; +using Blog.Core.EventBus; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; + +namespace Blog.Core.Extensions +{ + /// + /// 注入Kafka相关配置 + /// + public static class KafkaSetup + { + public static void AddKafkaSetup(this IServiceCollection services,IConfiguration configuration) + { + if (services == null) throw new ArgumentNullException(nameof(services)); + + if (Appsettings.app(new string[] { "Kafka", "Enabled" }).ObjToBool()) + { + services.Configure(configuration.GetSection("kafka")); + services.AddSingleton(); + } + } + } +}