This commit is contained in:
ansonzhang 2020-11-20 11:19:01 +00:00
parent cb41ccb259
commit 55437879b7
10 changed files with 182 additions and 16 deletions

View File

@ -4,10 +4,13 @@ using System.Linq;
namespace Blog.Core.EventBus
{
/// <summary>
/// 基于内存
/// 事件总线订阅管理器
/// 单例模式
/// </summary>
public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager
{
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
private readonly List<Type> _eventTypes;
@ -22,12 +25,22 @@ namespace Blog.Core.EventBus
public bool IsEmpty => !_handlers.Keys.Any();
public void Clear() => _handlers.Clear();
/// <summary>
/// 添加动态订阅
/// </summary>
/// <typeparam name="TH">约束:动态事件处理器接口</typeparam>
/// <param name="eventName"></param>
public void AddDynamicSubscription<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler
{
DoAddSubscription(typeof(TH), eventName, isDynamic: true);
}
/// <summary>
/// 添加订阅
/// </summary>
/// <typeparam name="T">约束:事件</typeparam>
/// <typeparam name="TH">约束:事件处理器接口<事件></typeparam>
public void AddSubscription<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
@ -65,7 +78,11 @@ namespace Blog.Core.EventBus
}
}
/// <summary>
/// 移除动态订阅
/// </summary>
/// <typeparam name="TH"></typeparam>
/// <param name="eventName"></param>
public void RemoveDynamicSubscription<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler
{
@ -123,7 +140,12 @@ namespace Blog.Core.EventBus
return DoFindSubscriptionToRemove(eventName, typeof(TH));
}
/// <summary>
/// 查询订阅并移除
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="TH"></typeparam>
/// <returns></returns>
private SubscriptionInfo FindSubscriptionToRemove<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>

View File

@ -2,6 +2,9 @@
namespace Blog.Core.EventBus
{
/// <summary>
/// 订阅信息模型
/// </summary>
public class SubscriptionInfo
{
public bool IsDynamic { get; }

View File

@ -2,6 +2,10 @@
namespace Blog.Core.EventBus
{
/// <summary>
/// 动态集成事件处理程序
/// 接口
/// </summary>
public interface IDynamicIntegrationEventHandler
{
Task Handle(dynamic eventData);

View File

@ -1,21 +1,49 @@
namespace Blog.Core.EventBus
{
/// <summary>
/// 事件总线
/// 接口
/// </summary>
public interface IEventBus
{
/// <summary>
/// 发布
/// </summary>
/// <param name="event">事件模型</param>
void Publish(IntegrationEvent @event);
/// <summary>
/// 订阅
/// </summary>
/// <typeparam name="T">约束:事件模型</typeparam>
/// <typeparam name="TH">约束:事件处理器<事件模型></typeparam>
void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>;
void SubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
void UnsubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
/// <summary>
/// 取消订阅
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="TH"></typeparam>
void Unsubscribe<T, TH>()
where TH : IIntegrationEventHandler<T>
where T : IntegrationEvent;
/// <summary>
/// 动态订阅
/// </summary>
/// <typeparam name="TH">约束:事件处理器</typeparam>
/// <param name="eventName"></param>
void SubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
/// <summary>
/// 动态取消订阅
/// </summary>
/// <typeparam name="TH"></typeparam>
/// <param name="eventName"></param>
void UnsubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler;
}
}

View File

@ -3,6 +3,10 @@ using System.Collections.Generic;
namespace Blog.Core.EventBus
{
/// <summary>
/// 事件总线订阅管理器
/// 接口
/// </summary>
public interface IEventBusSubscriptionsManager
{
bool IsEmpty { get; }

View File

@ -2,12 +2,21 @@
namespace Blog.Core.EventBus
{
/// <summary>
/// 集成事件处理程序
/// 泛型接口
/// </summary>
/// <typeparam name="TIntegrationEvent"></typeparam>
public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandler
where TIntegrationEvent : IntegrationEvent
{
Task Handle(TIntegrationEvent @event);
}
/// <summary>
/// 集成事件处理程序
/// 基 接口
/// </summary>
public interface IIntegrationEventHandler
{
}

View File

@ -3,6 +3,10 @@ using System;
namespace Blog.Core.EventBus
{
/// <summary>
/// 事件模型
/// 基类
/// </summary>
public class IntegrationEvent
{
public IntegrationEvent()

View File

@ -16,6 +16,9 @@ using System.Threading.Tasks;
namespace Blog.Core.EventBus
{
/// <summary>
/// 基于RabbitMQ的事件总线
/// </summary>
public class EventBusRabbitMQ : IEventBus, IDisposable
{
const string BROKER_NAME = "blogcore_event_bus";
@ -30,8 +33,20 @@ namespace Blog.Core.EventBus
private IModel _consumerChannel;
private string _queueName;
/// <summary>
/// RabbitMQ事件总线
/// </summary>
/// <param name="persistentConnection">RabbitMQ持久连接</param>
/// <param name="logger">日志</param>
/// <param name="autofac">autofac容器</param>
/// <param name="subsManager">事件总线订阅管理器</param>
/// <param name="queueName">队列名称</param>
/// <param name="retryCount">重试次数</param>
public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,
ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5)
ILifetimeScope autofac,
IEventBusSubscriptionsManager subsManager,
string queueName = null,
int retryCount = 5)
{
_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
@ -43,6 +58,11 @@ namespace Blog.Core.EventBus
_subsManager.OnEventRemoved += SubsManager_OnEventRemoved;
}
/// <summary>
/// 订阅管理器事件
/// </summary>
/// <param name="sender"></param>
/// <param name="eventName"></param>
private void SubsManager_OnEventRemoved(object sender, string eventName)
{
if (!_persistentConnection.IsConnected)
@ -64,6 +84,10 @@ namespace Blog.Core.EventBus
}
}
/// <summary>
/// 发布
/// </summary>
/// <param name="event">事件模型</param>
public void Publish(IntegrationEvent @event)
{
if (!_persistentConnection.IsConnected)
@ -109,6 +133,12 @@ namespace Blog.Core.EventBus
}
}
/// <summary>
/// 订阅
/// 动态
/// </summary>
/// <typeparam name="TH">事件处理器</typeparam>
/// <param name="eventName">事件名</param>
public void SubscribeDynamic<TH>(string eventName)
where TH : IDynamicIntegrationEventHandler
{
@ -119,6 +149,11 @@ namespace Blog.Core.EventBus
StartBasicConsume();
}
/// <summary>
/// 订阅
/// </summary>
/// <typeparam name="T">约束:事件模型</typeparam>
/// <typeparam name="TH">约束:事件处理器<事件模型></typeparam>
public void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
@ -153,6 +188,11 @@ namespace Blog.Core.EventBus
}
}
/// <summary>
/// 取消订阅
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="TH"></typeparam>
public void Unsubscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
@ -180,6 +220,9 @@ namespace Blog.Core.EventBus
_subsManager.Clear();
}
/// <summary>
/// 开始基本消费
/// </summary>
private void StartBasicConsume()
{
_logger.LogTrace("Starting RabbitMQ basic consume");
@ -201,6 +244,12 @@ namespace Blog.Core.EventBus
}
}
/// <summary>
/// 消费者接受到
/// </summary>
/// <param name="sender"></param>
/// <param name="eventArgs"></param>
/// <returns></returns>
private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
{
var eventName = eventArgs.RoutingKey;
@ -226,6 +275,10 @@ namespace Blog.Core.EventBus
_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
}
/// <summary>
/// 创造消费通道
/// </summary>
/// <returns></returns>
private IModel CreateConsumerChannel()
{
if (!_persistentConnection.IsConnected)

View File

@ -3,6 +3,10 @@ using System;
namespace Blog.Core.EventBus
{
/// <summary>
/// RabbitMQ持久连接
/// 接口
/// </summary>
public interface IRabbitMQPersistentConnection
: IDisposable
{

View File

@ -10,6 +10,9 @@ using System.Net.Sockets;
namespace Blog.Core.EventBus
{
/// <summary>
/// RabbitMQ持久连接
/// </summary>
public class RabbitMQPersistentConnection
: IRabbitMQPersistentConnection
{
@ -21,13 +24,17 @@ namespace Blog.Core.EventBus
object sync_root = new object();
public RabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQPersistentConnection> logger, int retryCount = 5)
public RabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQPersistentConnection> logger,
int retryCount = 5)
{
_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_retryCount = retryCount;
}
/// <summary>
/// 是否已连接
/// </summary>
public bool IsConnected
{
get
@ -36,6 +43,10 @@ namespace Blog.Core.EventBus
}
}
/// <summary>
/// 创建Model
/// </summary>
/// <returns></returns>
public IModel CreateModel()
{
if (!IsConnected)
@ -46,6 +57,9 @@ namespace Blog.Core.EventBus
return _connection.CreateModel();
}
/// <summary>
/// 释放
/// </summary>
public void Dispose()
{
if (_disposed) return;
@ -62,6 +76,10 @@ namespace Blog.Core.EventBus
}
}
/// <summary>
/// 连接
/// </summary>
/// <returns></returns>
public bool TryConnect()
{
_logger.LogInformation("RabbitMQ Client is trying to connect");
@ -70,7 +88,9 @@ namespace Blog.Core.EventBus
{
var policy = RetryPolicy.Handle<SocketException>()
.Or<BrokerUnreachableException>()
.WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
.WaitAndRetry(_retryCount,
retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>
{
_logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message);
}
@ -101,6 +121,11 @@ namespace Blog.Core.EventBus
}
}
/// <summary>
/// 连接被阻断
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
{
if (_disposed) return;
@ -110,6 +135,11 @@ namespace Blog.Core.EventBus
TryConnect();
}
/// <summary>
/// 连接出现异常
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
void OnCallbackException(object sender, CallbackExceptionEventArgs e)
{
if (_disposed) return;
@ -119,6 +149,11 @@ namespace Blog.Core.EventBus
TryConnect();
}
/// <summary>
/// 连接被关闭
/// </summary>
/// <param name="sender"></param>
/// <param name="reason"></param>
void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
{
if (_disposed) return;