diff --git a/Blog.Core.EventBus/EventBusSubscriptions/InMemoryEventBusSubscriptionsManager.cs b/Blog.Core.EventBus/EventBusSubscriptions/InMemoryEventBusSubscriptionsManager.cs index fe47cea..2612a27 100644 --- a/Blog.Core.EventBus/EventBusSubscriptions/InMemoryEventBusSubscriptionsManager.cs +++ b/Blog.Core.EventBus/EventBusSubscriptions/InMemoryEventBusSubscriptionsManager.cs @@ -4,10 +4,13 @@ using System.Linq; namespace Blog.Core.EventBus { + /// + /// 基于内存 + /// 事件总线订阅管理器 + /// 单例模式 + /// public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager { - - private readonly Dictionary> _handlers; private readonly List _eventTypes; @@ -22,12 +25,22 @@ namespace Blog.Core.EventBus public bool IsEmpty => !_handlers.Keys.Any(); public void Clear() => _handlers.Clear(); + /// + /// 添加动态订阅 + /// + /// 约束:动态事件处理器接口 + /// public void AddDynamicSubscription(string eventName) where TH : IDynamicIntegrationEventHandler { DoAddSubscription(typeof(TH), eventName, isDynamic: true); } + /// + /// 添加订阅 + /// + /// 约束:事件 + /// 约束:事件处理器接口<事件> public void AddSubscription() where T : IntegrationEvent where TH : IIntegrationEventHandler @@ -65,7 +78,11 @@ namespace Blog.Core.EventBus } } - + /// + /// 移除动态订阅 + /// + /// + /// public void RemoveDynamicSubscription(string eventName) where TH : IDynamicIntegrationEventHandler { @@ -123,7 +140,12 @@ namespace Blog.Core.EventBus return DoFindSubscriptionToRemove(eventName, typeof(TH)); } - + /// + /// 查询订阅并移除 + /// + /// + /// + /// private SubscriptionInfo FindSubscriptionToRemove() where T : IntegrationEvent where TH : IIntegrationEventHandler diff --git a/Blog.Core.EventBus/EventBusSubscriptions/SubscriptionInfo.cs b/Blog.Core.EventBus/EventBusSubscriptions/SubscriptionInfo.cs index 5157b45..456a4cc 100644 --- a/Blog.Core.EventBus/EventBusSubscriptions/SubscriptionInfo.cs +++ b/Blog.Core.EventBus/EventBusSubscriptions/SubscriptionInfo.cs @@ -2,6 +2,9 @@ namespace Blog.Core.EventBus { + /// + /// 订阅信息模型 + /// public class SubscriptionInfo { public bool IsDynamic { get; } diff --git a/Blog.Core.EventBus/Eventbus/IDynamicIntegrationEventHandler.cs b/Blog.Core.EventBus/Eventbus/IDynamicIntegrationEventHandler.cs index 5bf671c..b183ed5 100644 --- a/Blog.Core.EventBus/Eventbus/IDynamicIntegrationEventHandler.cs +++ b/Blog.Core.EventBus/Eventbus/IDynamicIntegrationEventHandler.cs @@ -2,6 +2,10 @@ namespace Blog.Core.EventBus { + /// + /// 动态集成事件处理程序 + /// 接口 + /// public interface IDynamicIntegrationEventHandler { Task Handle(dynamic eventData); diff --git a/Blog.Core.EventBus/Eventbus/IEventBus.cs b/Blog.Core.EventBus/Eventbus/IEventBus.cs index e89633f..7f3f60b 100644 --- a/Blog.Core.EventBus/Eventbus/IEventBus.cs +++ b/Blog.Core.EventBus/Eventbus/IEventBus.cs @@ -1,21 +1,49 @@ namespace Blog.Core.EventBus { + /// + /// 事件总线 + /// 接口 + /// public interface IEventBus { + /// + /// 发布 + /// + /// 事件模型 void Publish(IntegrationEvent @event); + /// + /// 订阅 + /// + /// 约束:事件模型 + /// 约束:事件处理器<事件模型> void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler; - void SubscribeDynamic(string eventName) - where TH : IDynamicIntegrationEventHandler; - - void UnsubscribeDynamic(string eventName) - where TH : IDynamicIntegrationEventHandler; - + /// + /// 取消订阅 + /// + /// + /// void Unsubscribe() where TH : IIntegrationEventHandler where T : IntegrationEvent; + + /// + /// 动态订阅 + /// + /// 约束:事件处理器 + /// + void SubscribeDynamic(string eventName) + where TH : IDynamicIntegrationEventHandler; + + /// + /// 动态取消订阅 + /// + /// + /// + void UnsubscribeDynamic(string eventName) + where TH : IDynamicIntegrationEventHandler; } } diff --git a/Blog.Core.EventBus/Eventbus/IEventBusSubscriptionsManager.cs b/Blog.Core.EventBus/Eventbus/IEventBusSubscriptionsManager.cs index 5090ec3..bfd3a0d 100644 --- a/Blog.Core.EventBus/Eventbus/IEventBusSubscriptionsManager.cs +++ b/Blog.Core.EventBus/Eventbus/IEventBusSubscriptionsManager.cs @@ -3,6 +3,10 @@ using System.Collections.Generic; namespace Blog.Core.EventBus { + /// + /// 事件总线订阅管理器 + /// 接口 + /// public interface IEventBusSubscriptionsManager { bool IsEmpty { get; } diff --git a/Blog.Core.EventBus/Eventbus/IIntegrationEventHandler.cs b/Blog.Core.EventBus/Eventbus/IIntegrationEventHandler.cs index 5eb609f..d955cdd 100644 --- a/Blog.Core.EventBus/Eventbus/IIntegrationEventHandler.cs +++ b/Blog.Core.EventBus/Eventbus/IIntegrationEventHandler.cs @@ -2,12 +2,21 @@ namespace Blog.Core.EventBus { + /// + /// 集成事件处理程序 + /// 泛型接口 + /// + /// public interface IIntegrationEventHandler : IIntegrationEventHandler where TIntegrationEvent : IntegrationEvent { Task Handle(TIntegrationEvent @event); } + /// + /// 集成事件处理程序 + /// 基 接口 + /// public interface IIntegrationEventHandler { } diff --git a/Blog.Core.EventBus/Eventbus/IntegrationEvent.cs b/Blog.Core.EventBus/Eventbus/IntegrationEvent.cs index 1f37f5e..a9f6a5e 100644 --- a/Blog.Core.EventBus/Eventbus/IntegrationEvent.cs +++ b/Blog.Core.EventBus/Eventbus/IntegrationEvent.cs @@ -3,6 +3,10 @@ using System; namespace Blog.Core.EventBus { + /// + /// 事件模型 + /// 基类 + /// public class IntegrationEvent { public IntegrationEvent() diff --git a/Blog.Core.EventBus/RabbitMQPersistent/EventBusRabbitMQ.cs b/Blog.Core.EventBus/RabbitMQPersistent/EventBusRabbitMQ.cs index f501542..5eddad7 100644 --- a/Blog.Core.EventBus/RabbitMQPersistent/EventBusRabbitMQ.cs +++ b/Blog.Core.EventBus/RabbitMQPersistent/EventBusRabbitMQ.cs @@ -16,6 +16,9 @@ using System.Threading.Tasks; namespace Blog.Core.EventBus { + /// + /// 基于RabbitMQ的事件总线 + /// public class EventBusRabbitMQ : IEventBus, IDisposable { const string BROKER_NAME = "blogcore_event_bus"; @@ -30,8 +33,20 @@ namespace Blog.Core.EventBus private IModel _consumerChannel; private string _queueName; + /// + /// RabbitMQ事件总线 + /// + /// RabbitMQ持久连接 + /// 日志 + /// autofac容器 + /// 事件总线订阅管理器 + /// 队列名称 + /// 重试次数 public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger logger, - ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5) + ILifetimeScope autofac, + IEventBusSubscriptionsManager subsManager, + string queueName = null, + int retryCount = 5) { _persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); @@ -43,6 +58,11 @@ namespace Blog.Core.EventBus _subsManager.OnEventRemoved += SubsManager_OnEventRemoved; } + /// + /// 订阅管理器事件 + /// + /// + /// private void SubsManager_OnEventRemoved(object sender, string eventName) { if (!_persistentConnection.IsConnected) @@ -64,6 +84,10 @@ namespace Blog.Core.EventBus } } + /// + /// 发布 + /// + /// 事件模型 public void Publish(IntegrationEvent @event) { if (!_persistentConnection.IsConnected) @@ -109,6 +133,12 @@ namespace Blog.Core.EventBus } } + /// + /// 订阅 + /// 动态 + /// + /// 事件处理器 + /// 事件名 public void SubscribeDynamic(string eventName) where TH : IDynamicIntegrationEventHandler { @@ -119,6 +149,11 @@ namespace Blog.Core.EventBus StartBasicConsume(); } + /// + /// 订阅 + /// + /// 约束:事件模型 + /// 约束:事件处理器<事件模型> public void Subscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler @@ -153,6 +188,11 @@ namespace Blog.Core.EventBus } } + /// + /// 取消订阅 + /// + /// + /// public void Unsubscribe() where T : IntegrationEvent where TH : IIntegrationEventHandler @@ -180,6 +220,9 @@ namespace Blog.Core.EventBus _subsManager.Clear(); } + /// + /// 开始基本消费 + /// private void StartBasicConsume() { _logger.LogTrace("Starting RabbitMQ basic consume"); @@ -201,6 +244,12 @@ namespace Blog.Core.EventBus } } + /// + /// 消费者接受到 + /// + /// + /// + /// private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs) { var eventName = eventArgs.RoutingKey; @@ -226,6 +275,10 @@ namespace Blog.Core.EventBus _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false); } + /// + /// 创造消费通道 + /// + /// private IModel CreateConsumerChannel() { if (!_persistentConnection.IsConnected) diff --git a/Blog.Core.EventBus/RabbitMQPersistent/IRabbitMQPersistentConnection.cs b/Blog.Core.EventBus/RabbitMQPersistent/IRabbitMQPersistentConnection.cs index c8fe4e3..c90b0d4 100644 --- a/Blog.Core.EventBus/RabbitMQPersistent/IRabbitMQPersistentConnection.cs +++ b/Blog.Core.EventBus/RabbitMQPersistent/IRabbitMQPersistentConnection.cs @@ -3,6 +3,10 @@ using System; namespace Blog.Core.EventBus { + /// + /// RabbitMQ持久连接 + /// 接口 + /// public interface IRabbitMQPersistentConnection : IDisposable { diff --git a/Blog.Core.EventBus/RabbitMQPersistent/RabbitMQPersistentConnection.cs b/Blog.Core.EventBus/RabbitMQPersistent/RabbitMQPersistentConnection.cs index 0952a78..be2d8de 100644 --- a/Blog.Core.EventBus/RabbitMQPersistent/RabbitMQPersistentConnection.cs +++ b/Blog.Core.EventBus/RabbitMQPersistent/RabbitMQPersistentConnection.cs @@ -10,6 +10,9 @@ using System.Net.Sockets; namespace Blog.Core.EventBus { + /// + /// RabbitMQ持久连接 + /// public class RabbitMQPersistentConnection : IRabbitMQPersistentConnection { @@ -21,13 +24,17 @@ namespace Blog.Core.EventBus object sync_root = new object(); - public RabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger logger, int retryCount = 5) + public RabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger logger, + int retryCount = 5) { _connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory)); _logger = logger ?? throw new ArgumentNullException(nameof(logger)); _retryCount = retryCount; } + /// + /// 是否已连接 + /// public bool IsConnected { get @@ -36,6 +43,10 @@ namespace Blog.Core.EventBus } } + /// + /// 创建Model + /// + /// public IModel CreateModel() { if (!IsConnected) @@ -46,6 +57,9 @@ namespace Blog.Core.EventBus return _connection.CreateModel(); } + /// + /// 释放 + /// public void Dispose() { if (_disposed) return; @@ -62,6 +76,10 @@ namespace Blog.Core.EventBus } } + /// + /// 连接 + /// + /// public bool TryConnect() { _logger.LogInformation("RabbitMQ Client is trying to connect"); @@ -70,10 +88,12 @@ namespace Blog.Core.EventBus { var policy = RetryPolicy.Handle() .Or() - .WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => - { - _logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message); - } + .WaitAndRetry(_retryCount, + retryAttempt => + TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => + { + _logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message); + } ); policy.Execute(() => @@ -101,6 +121,11 @@ namespace Blog.Core.EventBus } } + /// + /// 连接被阻断 + /// + /// + /// private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) { if (_disposed) return; @@ -110,6 +135,11 @@ namespace Blog.Core.EventBus TryConnect(); } + /// + /// 连接出现异常 + /// + /// + /// void OnCallbackException(object sender, CallbackExceptionEventArgs e) { if (_disposed) return; @@ -119,6 +149,11 @@ namespace Blog.Core.EventBus TryConnect(); } + /// + /// 连接被关闭 + /// + /// + /// void OnConnectionShutdown(object sender, ShutdownEventArgs reason) { if (_disposed) return;