feat: 🚡 RabbitMQ

This commit is contained in:
anjoy8 2023-12-01 11:18:21 +08:00
parent 59e729fa2c
commit c4a6c84d96
3 changed files with 76 additions and 27 deletions

View File

@ -92,22 +92,14 @@ namespace Blog.Core.Controllers
/// </summary>
[HttpGet]
[AllowAnonymous]
public void TestRabbitMqPublish()
public IActionResult TestRabbitMqPublish()
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
using var channel = _persistentConnection.CreateModel();
var message = " < i am a sender! > ";
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
channel.BasicPublish(
exchange: "blogcore",
routingKey: "eventName",
mandatory: true,
basicProperties: properties,
body: body);
_persistentConnection.PublishMessage("Hello, RabbitMQ!", exchangeName: "blogcore", routingKey: "myRoutingKey");
return Ok();
}
/// <summary>
@ -115,28 +107,15 @@ namespace Blog.Core.Controllers
/// </summary>
[HttpGet]
[AllowAnonymous]
public void TestRabbitMqSubscribe()
public IActionResult TestRabbitMqSubscribe()
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
string QueueName = "testq";
using var channel = _persistentConnection.CreateModel();
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += new AsyncEventHandler<BasicDeliverEventArgs>(
async (a, b) =>
{
var Headers = b.BasicProperties.Headers;
var msgBody = b.Body.ToArray();
bool Dealresult = await Dealer(b.Exchange, b.RoutingKey, msgBody, Headers);
if (Dealresult) channel.BasicAck(b.DeliveryTag, false);
else channel.BasicNack(b.DeliveryTag, false, true);
}
);
channel.BasicConsume(QueueName, false, consumer);
_persistentConnection.StartConsuming("myQueue");
return Ok();
}
private async Task<bool> Dealer(string exchange, string routingKey, byte[] msgBody, IDictionary<string, object> headers)

View File

@ -10,10 +10,35 @@ namespace Blog.Core.EventBus
public interface IRabbitMQPersistentConnection
: IDisposable
{
/// <summary>
/// 是否已经连接
/// </summary>
bool IsConnected { get; }
/// <summary>
/// 尝试重连
/// </summary>
/// <returns></returns>
bool TryConnect();
/// <summary>
/// 创建Model
/// </summary>
/// <returns></returns>
IModel CreateModel();
/// <summary>
/// 发布消息
/// </summary>
/// <param name="message"></param>
/// <param name="exchangeName"></param>
/// <param name="routingKey"></param>
void PublishMessage(string message, string exchangeName, string routingKey);
/// <summary>
/// 订阅消息
/// </summary>
/// <param name="queueName"></param>
void StartConsuming(string queueName);
}
}

View File

@ -7,6 +7,7 @@ using RabbitMQ.Client.Exceptions;
using System;
using System.IO;
using System.Net.Sockets;
using System.Text;
namespace Blog.Core.EventBus
{
@ -162,5 +163,49 @@ namespace Blog.Core.EventBus
TryConnect();
}
/// <summary>
/// 发布消息
/// </summary>
/// <param name="message"></param>
/// <param name="exchangeName"></param>
/// <param name="routingKey"></param>
public void PublishMessage(string message, string exchangeName, string routingKey)
{
using var channel = CreateModel();
channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, true);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: null, body: body);
}
/// <summary>
/// 订阅消息
/// </summary>
/// <param name="queueName"></param>
public void StartConsuming(string queueName)
{
using var channel = CreateModel();
channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += new AsyncEventHandler<BasicDeliverEventArgs>(
async (a, b) =>
{
var Headers = b.BasicProperties.Headers;
var msgBody = b.Body.ToArray();
var message = Encoding.UTF8.GetString(msgBody);
await Task.CompletedTask;
Console.WriteLine("Received message: {0}", message);
//bool Dealresult = await Dealer(b.Exchange, b.RoutingKey, msgBody, Headers);
//if (Dealresult) channel.BasicAck(b.DeliveryTag, false);
//else channel.BasicNack(b.DeliveryTag, false, true);
}
);
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine("Consuming messages...");
}
}
}