Kafka message communication

This commit is contained in:
867824092 2021-08-23 21:34:37 +08:00
parent a48865c311
commit e9248f5ecc
11 changed files with 503 additions and 17 deletions

View File

@ -68,6 +68,7 @@ namespace Blog.Core
services.AddRedisInitMqSetup(); services.AddRedisInitMqSetup();
services.AddRabbitMQSetup(); services.AddRabbitMQSetup();
services.AddKafkaSetup(Configuration);
services.AddEventBusSetup(); services.AddEventBusSetup();
services.AddNacosSetup(Configuration); services.AddNacosSetup(Configuration);

View File

@ -31,6 +31,13 @@
"UserName": "", "UserName": "",
"Password": "!", "Password": "!",
"RetryCount": 3 "RetryCount": 3
},
"Kafka": {
"Enabled": false,
"Servers": "localhost:9092",
"Topic": "blog",
"GroupId": "blog-consumer",
"NumPartitions": 3 //
}, },
"EventBus": { "EventBus": {
"Enabled": false, "Enabled": false,

View File

@ -8,11 +8,13 @@
<ItemGroup> <ItemGroup>
<PackageReference Include="Autofac.Extensions.DependencyInjection" Version="6.0.0" /> <PackageReference Include="Autofac.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Autofac.Extras.DynamicProxy" Version="5.0.0" /> <PackageReference Include="Autofac.Extras.DynamicProxy" Version="5.0.0" />
<PackageReference Include="Confluent.Kafka" Version="1.7.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Logging" Version="5.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" /> <PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="Polly" Version="7.2.1" /> <PackageReference Include="Polly" Version="7.2.1" />
<PackageReference Include="protobuf-net" Version="3.0.101" />
<PackageReference Include="RabbitMQ.Client" Version="6.2.1" /> <PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
</ItemGroup> </ItemGroup>

View File

@ -0,0 +1,118 @@
using Blog.Core.Common.Extensions;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using System;
namespace Blog.Core.EventBus
{
/// <summary>
/// 基于Kafka的事件总线
/// </summary>
public class EventBusKafka : IEventBus
{
private readonly ILogger<EventBusKafka> _logger;
private readonly IEventBusSubscriptionsManager _subsManager;
private readonly IKafkaConnectionPool _connectionPool;
private readonly KafkaOptions _options;
public EventBusKafka(ILogger<EventBusKafka> logger,
IEventBusSubscriptionsManager subsManager,
IKafkaConnectionPool connectionPool,
IOptions<KafkaOptions> options)
{
_logger = logger;
_subsManager = subsManager;
_connectionPool = connectionPool;
_options = options.Value;
}
/// <summary>
/// 发布
/// </summary>
public void Publish(IntegrationEvent @event)
{
var producer = _connectionPool.Producer();
try
{
var eventName = @event.GetType().Name;
var body = Protobuf.Serialize(JsonConvert.SerializeObject(@event));
DeliveryResult<string, byte[]> result = producer.ProduceAsync(_options.Topic, new Message<string, byte[]>
{
Key = eventName,
Value = body
}).ConfigureAwait(false).GetAwaiter().GetResult();
}
catch (Exception ex)
{
_logger.LogWarning($"Could not publish event: {@event.Id.ToString("N")} ({ex.Message}); Message:{ JsonConvert.SerializeObject(@event)}");
}
finally
{
//放入连接池中
_connectionPool.Return(producer);
}
}
/// <summary>
/// 订阅
/// 动态
/// </summary>
/// <typeparam name="TH">事件处理器</typeparam>
/// <param name="eventName">事件名</param>
public void SubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler
{
_logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
_subsManager.AddDynamicSubscription<TH>(eventName);
}
/// <summary>
/// 订阅
/// </summary>
/// <typeparam name="T">约束:事件模型</typeparam>
/// <typeparam name="TH">约束:事件处理器<事件模型></typeparam>
public void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = _subsManager.GetEventKey<T>();
_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());
_subsManager.AddSubscription<T, TH>();
}
/// <summary>
/// 取消订阅
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="TH"></typeparam>
public void Unsubscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
var eventName = _subsManager.GetEventKey<T>();
_logger.LogInformation("Unsubscribing from event {EventName}", eventName);
_subsManager.RemoveSubscription<T, TH>();
}
public void UnsubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler
{
_subsManager.RemoveDynamicSubscription<TH>(eventName);
}
public void Dispose()
{
if (_connectionPool != null)
{
_connectionPool.Dispose();
}
_subsManager.Clear();
}
}
}

View File

@ -0,0 +1,25 @@
using Confluent.Kafka;
using System;
namespace Blog.Core.EventBus
{
/// <summary>
/// Kafka连接池
/// </summary>
public interface IKafkaConnectionPool:IDisposable
{
/// <summary>
/// 取对象
/// </summary>
/// <returns></returns>
IProducer<string, byte[]> Producer();
/// <summary>
/// 将对象放入连接池
/// </summary>
/// <param name="producer"></param>
/// <returns></returns>
bool Return(IProducer<string, byte[]> producer);
}
}

View File

@ -0,0 +1,79 @@
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Collections.Concurrent;
using System.Threading;
namespace Blog.Core.EventBus
{
/// <summary>
/// Kafka producer 连接池管理
/// 可以使用微软官方的对象池进行构造ObjectPool
/// </summary>
public class KafkaConnectionPool : IKafkaConnectionPool
{
private readonly KafkaOptions _options;
private ConcurrentQueue<IProducer<string, byte[]>> _producerPool = new();
private int _currentCount;
private int _maxSize;
public KafkaConnectionPool(IOptions<KafkaOptions> options)
{
_options = options.Value;
_maxSize = _options.ConnectionPoolSize;
}
/// <summary>
/// 取对象
/// </summary>
/// <returns></returns>
public IProducer<string,byte[]> Producer()
{
if (_producerPool.TryDequeue(out var producer))
{
Interlocked.Decrement(ref _currentCount);
return producer;
}
var config = new ProducerConfig()
{
BootstrapServers = _options.Servers,
QueueBufferingMaxMessages = 10,
MessageTimeoutMs = 5000,
RequestTimeoutMs = 3000
};
producer = new ProducerBuilder<string, byte[]>(config)
.Build();
return producer;
}
/// <summary>
/// 将对象放入连接池
/// </summary>
/// <param name="producer"></param>
/// <returns></returns>
public bool Return(IProducer<string, byte[]> producer)
{
if (Interlocked.Increment(ref _currentCount) <= _maxSize)
{
_producerPool.Enqueue(producer);
return true;
}
producer.Dispose();
Interlocked.Decrement(ref _currentCount);
return false;
}
public void Dispose()
{
_maxSize = 0;
_currentCount = 0;
while (_producerPool.TryDequeue(out var context))
{
context?.Dispose();
}
}
}
}

View File

@ -0,0 +1,162 @@
using Autofac;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Blog.Core.EventBus
{
/// <summary>
/// Kafka consumer 监听服务
/// </summary>
public class KafkaConsumerHostService : BackgroundService
{
private readonly string AUTOFAC_SCOPE_NAME = "blogcore_event_bus";
private readonly ILogger<KafkaConsumerHostService> _logger;
private readonly IConsumer<string, byte[]> _consumer;
private readonly KafkaOptions _options;
private readonly IEventBusSubscriptionsManager _subsManager;
private readonly ILifetimeScope _autofac;
private CancellationTokenSource cts = new();
public KafkaConsumerHostService(ILogger<KafkaConsumerHostService> logger,
IOptions<KafkaOptions> options,
IEventBusSubscriptionsManager eventBusSubscriptionsManager,
ILifetimeScope autofac)
{
_autofac = autofac;
_subsManager = eventBusSubscriptionsManager;
_logger = logger;
_options = options.Value;
_consumer = new ConsumerBuilder<string, byte[]>(new ConsumerConfig
{
BootstrapServers = _options.Servers,
GroupId = _options.GroupId,
AutoOffsetReset = AutoOffsetReset.Earliest,
AllowAutoCreateTopics = true,
EnableAutoCommit = false,
LogConnectionClose = false
}).SetErrorHandler(ConsumerClient_OnConsumeError)
.Build();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var result = await FetchTopicAsync();
if (result)
{
_consumer.Subscribe(_options.Topic);
while (!cts.Token.IsCancellationRequested)
{
var consumerResult = _consumer.Consume(cts.Token);
try
{
if (consumerResult.IsPartitionEOF || consumerResult.Message.Value == null) continue;
var @event = Protobuf.Deserialize<string>(consumerResult.Message.Value);
await ProcessEvent(consumerResult.Message.Key, @event);
}
catch (ConsumeException e)
{
_logger.LogError($"Error occured: {e.Error.Reason}");
}
finally
{
_consumer.Commit(consumerResult);
}
}
}
}
public override Task StopAsync(CancellationToken cancellationToken)
{
cts.Cancel();
_logger.LogInformation("kafka consumer stop and disposable");
_consumer.Dispose();
return base.StopAsync(cancellationToken);
}
/// <summary>
/// 检测当前Topic是否存在
/// </summary>
/// <returns></returns>
private async Task<bool> FetchTopicAsync()
{
if (string.IsNullOrEmpty(_options.Topic))
throw new ArgumentNullException(nameof(_options.Topic));
try
{
var config = new AdminClientConfig { BootstrapServers = _options.Servers };
using var adminClient = new AdminClientBuilder(config).Build();
await adminClient.CreateTopicsAsync(Enumerable.Range(0,1).Select(u=> new TopicSpecification
{
Name = _options.Topic,
NumPartitions = _options.NumPartitions
}));
}
catch (CreateTopicsException ex) when (ex.Message.Contains("already exists"))
{
}
catch (Exception ex)
{
_logger.LogError("An error was encountered when automatically creating topic! -->" + ex.Message);
return false;
}
return true;
}
/// <summary>
/// 接收到消息进行处理
/// </summary>
/// <param name="eventName">事件名称</param>
/// <param name="message">消息内容</param>
/// <returns></returns>
private async Task ProcessEvent(string eventName, string message)
{
_logger.LogTrace("Processing Kafka event: {EventName}", eventName);
if (_subsManager.HasSubscriptionsForEvent(eventName))
{
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
{
var subscriptions = _subsManager.GetHandlersForEvent(eventName);
foreach (var subscription in subscriptions)
{
if (subscription.IsDynamic)
{
var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
if (handler == null) continue;
dynamic eventData = JObject.Parse(message);
await Task.Yield();
await handler.Handle(eventData);
}
else
{
var handler = scope.ResolveOptional(subscription.HandlerType);
if (handler == null) continue;
var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await Task.Yield();
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });
}
}
}
}
else
{
_logger.LogWarning("No subscription for Kafka event: {EventName}", eventName);
}
}
private void ConsumerClient_OnConsumeError(IConsumer<string, byte[]> consumer, Error e)
{
_logger.LogError("An error occurred during connect kafka:" + e.Reason);
}
}
}

View File

@ -0,0 +1,28 @@

namespace Blog.Core.EventBus
{
/// <summary>
/// Kafka 配置项
/// </summary>
public class KafkaOptions
{
public int ConnectionPoolSize { get; set; } = 10;
/// <summary>
/// 地址
/// </summary>
public string Servers { get; set; }
/// <summary>
/// 主题
/// </summary>
public string Topic { get; set; }
/// <summary>
/// 消费者组Id
/// </summary>
public string GroupId { get; set; }
/// <summary>
/// 主题分区
/// </summary>
public int NumPartitions { get; set; }
}
}

View File

@ -0,0 +1,32 @@
using System;
using System.IO;
namespace Blog.Core.EventBus
{
public class Protobuf
{
/// <summary>
/// Protobuf 反序列化
/// </summary>
public static T Deserialize<T>(ReadOnlySpan<byte> data)
{
Stream stream = new MemoryStream(data.ToArray());
var info = ProtoBuf.Serializer.Deserialize<T>(stream);
return info;
}
/// <summary>
/// 通过Protobuf 转字节
/// </summary>
public static byte[] Serialize<T>(T data)
{
byte[] datas;
using (var stream = new MemoryStream())
{
ProtoBuf.Serializer.Serialize(stream, data);
datas = stream.ToArray();
}
return datas;
}
}
}

View File

@ -18,41 +18,47 @@ namespace Blog.Core.Extensions
{ {
if (services == null) throw new ArgumentNullException(nameof(services)); if (services == null) throw new ArgumentNullException(nameof(services));
if (Appsettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool() && Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool()) if (Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool())
{ {
var subscriptionClientName = Appsettings.app(new string[] { "EventBus", "SubscriptionClientName" }); var subscriptionClientName = Appsettings.app(new string[] { "EventBus", "SubscriptionClientName" });
services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>(); services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();
services.AddTransient<BlogDeletedIntegrationEventHandler>(); services.AddTransient<BlogDeletedIntegrationEventHandler>();
if (Appsettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool())
services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
{ {
var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>(); services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>
var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
var retryCount = 5;
if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" })))
{ {
retryCount = int.Parse(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" })); var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();
} var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();
var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();
var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount); var retryCount = 5;
}); if (!string.IsNullOrEmpty(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" })))
{
retryCount = int.Parse(Appsettings.app(new string[] { "RabbitMQ", "RetryCount" }));
}
return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);
});
}
if(Appsettings.app(new string[] { "Kafka", "Enabled" }).ObjToBool())
{
services.AddHostedService<KafkaConsumerHostService>();
services.AddSingleton<IEventBus, EventBusKafka>();
}
} }
} }
public static void ConfigureEventBus(this IApplicationBuilder app) public static void ConfigureEventBus(this IApplicationBuilder app)
{ {
if (Appsettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool() && Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool()) if (Appsettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool())
{ {
var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>(); var eventBus = app.ApplicationServices.GetRequiredService<IEventBus>();
eventBus.Subscribe<BlogDeletedIntegrationEvent, BlogDeletedIntegrationEventHandler>(); eventBus.Subscribe<BlogDeletedIntegrationEvent, BlogDeletedIntegrationEventHandler>();
} }
} }
} }

View File

@ -0,0 +1,26 @@
using Blog.Core.Common;
using Blog.Core.EventBus;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
namespace Blog.Core.Extensions
{
/// <summary>
/// 注入Kafka相关配置
/// </summary>
public static class KafkaSetup
{
public static void AddKafkaSetup(this IServiceCollection services,IConfiguration configuration)
{
if (services == null) throw new ArgumentNullException(nameof(services));
if (Appsettings.app(new string[] { "Kafka", "Enabled" }).ObjToBool())
{
services.Configure<KafkaOptions>(configuration.GetSection("kafka"));
services.AddSingleton<IKafkaConnectionPool,KafkaConnectionPool>();
}
}
}
}