.Net Core 教程 Part6 – (19)(20)(21)(22) DDD落地之集成事件

Part6 – (19) RabbitMQ简介

RabbitMQ的基本概念

1、集成事件是服务器间的通信,所以必须借助于第三方服务器作为事件总线。常用的消息中间件有Redis、RabbitMQ、Kafka、ActiveMQ等。

2、RabbitMQ的基本概念:

1)信道(Channel):信道是消息的生产者、消费者和服务器进行通信的虚拟连接。TCP连接的建立是非常消耗资源的,所以RabbitMQ在TCP连接的基础上构建了虚拟的信道。我们尽量重复使用TCP连接,而信道则是可以用完了就关闭

2)队列(Queue):用来进行消息收发的地方,生产者把消息放到队列中,消费者从队列中获取数据。

3)交换机(exchange):把消息路由到一个或者多个队列中。

RabbitMQ的routing模式

.Net Core 教程 Part6 – (19)(20)(21)(22)   DDD落地之集成事件
RabbitMQ

生产者把消息发布到交换机中,消息携带一个routingKey属性,交换机会根据routingKey的值把消息发送到一个或者多个队列;消费者会从队列中获取消息;交换机和队列都位于RabbitMQ服务器内部。优点:即使消费者不在线,消费者相关的消息也会被保存到队列中,当消费者上线之后,消费者就可以获取到离线期间错过的消息。

Part6 – (20) .NET中RabbitMQ的基本使用

基本用法

1、安装RabbitMQ服务器。

2、分别创建发送消息的项目和接收消息的控制台项目,这两个项目都安装NuGet包RabbitMQ.Client。

var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";//RabbitMQ服务器地址
factory.DispatchConsumersAsync = true;
string exchangeName = "exchange1";//交换机的名字
string eventName = "myEvent";// routingKey的值
using var conn = factory.CreateConnection();
while(true)
{
    string msg = DateTime.Now.TimeOfDay.ToString();//待发送消息
    using (var channel = conn.CreateModel())//创建信道
    {
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 2; 
        channel.ExchangeDeclare(exchange: exchangeName, type: "direct");//声明交换机
        byte[] body = Encoding.UTF8.GetBytes(msg);
        channel.BasicPublish(exchange: exchangeName,routingKey: eventName, mandatory: true,basicProperties: properties,body: body);//发布消息        
    }
    Console.WriteLine("发布了消息:" + msg);
    Thread.Sleep(1000);
}
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";
factory.DispatchConsumersAsync = true;
string exchangeName = "exchange1";
string eventName = "var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume(queue: queueName, autoAck: false,consumer: consumer);
Console.ReadLine();
async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{
    try
    {
        var bytes = args.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bytes);
        Console.WriteLine(DateTime.Now + "收到了消息" + msg);
        channel.BasicAck(args.DeliveryTag, multiple: false);
        await Task.Delay(800);
    }
    catch (Exception ex)
    {
        channel.BasicReject(args.DeliveryTag, true);//失败重发
        Console.WriteLine("处理收到的消息出错"+ex);
    }
}";
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel();
string queueName = "queue1";
channel.ExchangeDeclare(exchange: exchangeName,type: "direct");
channel.QueueDeclare(queue: queueName,durable: true, exclusive: false,autoDelete: false,arguments: null);
channel.QueueBind(queue: queueName, exchange: exchangeName,routingKey: eventName);//将routingKey绑定到Queue
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
channel.BasicConsume(queue: queueName, autoAck: false,consumer: consumer);
Console.ReadLine();
async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{
    try
    {
        var bytes = args.Body.ToArray();
        string msg = Encoding.UTF8.GetString(bytes);
        Console.WriteLine(DateTime.Now + "收到了消息" + msg);
        //Delivery Tag就是消息的编号
        channel.BasicAck(args.DeliveryTag, multiple: false);
        await Task.Delay(800);
    }
    catch (Exception ex)
    {
        channel.BasicReject(args.DeliveryTag, true);//失败重发
        Console.WriteLine("处理收到的消息出错"+ex);
    }
}

Part6 – (21) .NET中简化集成事件的框架

Zack.EventBus使用

1、每次都使用RabbitMQ原始代码太麻烦。参考并改进了微软开源的eShopOnContainers,开发了简化领域事件编程的开发包Zack.EventBus,并且简化了以后迁移到其他MQ服务器的工作量。

2、使用步骤:

1)创建两个ASP.NET Core Web API项目,它们分别是发布集成事件的项目和消费集成事件的项目,然后我们为这两个项目都安装NuGet包Zack.EventBus。

2)在两个项目中的Program.cs文件中的builder.Build()上面增加对IntegrationEventRabbitMQOptions进行配置的代码以及对AddEventBus的调用,然后还要在builder.Build()下面调用

3)在需要发布领域事件的类中注入IEventBus服务,然后调用IEventBus的Publish方法发布消息。

4)创造一个实现了IIntegrationEventHandler接口的类,这个类用来处理收到的事件。通过[EventName(“UserAdded”)]设定类监听的事件。

3、JsonIntegrationEventHandler和DynamicIntegrationEventHandler。

4、RabbitMQ等消息中间件的消息发布和消费的过程是异步的,也就是消息发布者将消息放入消息中间件就返回了,并不会等待消息的消费过程,因此集成事件不仅能够降低微服务之间的耦合度,也还能够起到削峰填谷的作用,避免一个微服务中的突发请求导致其他微服务雪崩的情况出现,而且消息中间件的失败重发机制可以提高消息处理的成功率,从而保证事务的最终一致性

5、最终一致性的事务:需要开发人员对流程进行精细的设计,甚至有时候需要引入人工补偿操作。不像强一致性事务那样是纯技术方案。

6、其他类似开源项目:CAP

using System.Reflection;
using Zack.EventBus;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var eventBusSec = builder.Configuration.GetSection("EventBus");
builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
builder.Services.AddEventBus("EventBusDemo1_Q1", Assembly.GetExecutingAssembly());
var app = builder.Build();
app.UseEventBus();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Zack.EventBus;

namespace 发送集成事件.Controllers
{
    [Route("api/[controller]")]
    [ApiController]
    public class DemoController : ControllerBase
    {
        private IEventBus eventBus;
        public DemoController(IEventBus eventBus)
        {
            this.eventBus = eventBus;
        }
        [HttpPost]
        public string Publish()
        {
            eventBus.Publish("UserAdded", new { UserName = "zcq", Age = 18 });
            return "ok";
        }
    }
}
using System.Reflection;
using Zack.EventBus;

var builder = WebApplication.CreateBuilder(args);
builder.Services.AddControllers();
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var eventBusSec = builder.Configuration.GetSection("EventBus");
builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
builder.Services.AddEventBus("EventBusDemo1_Q2", Assembly.GetExecutingAssembly());
var app = builder.Build();
app.UseEventBus();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();
using Zack.EventBus;

[EventName("UserAdded")]
public class UserAddesEventHandler : IIntegrationEventHandler
{
	private readonly ILogger<UserAddesEventHandler> logger;
	public UserAddesEventHandler(ILogger<UserAddesEventHandler> logger)
	{
		this.logger = logger;
	}
	public Task Handle(string eventName, string eventData)
	{
		logger.LogInformation("新建了用户:" + eventData);
		return Task.CompletedTask;
	}
}
using Zack.EventBus;

[EventName("UserAdded")]
public class UserAddesEventHandler2 : DynamicIntegrationEventHandler
{
    private readonly ILogger<UserAddesEventHandler2> logger;
    public UserAddesEventHandler2(ILogger<UserAddesEventHandler2> logger)
    {
        this.logger = logger;
    }
    public override Task HandleDynamic(string eventName, dynamic eventData)
    {
        logger.LogInformation($"Dynamic:{eventData.UserName}");
        return Task.CompletedTask;
    }
}
using Zack.EventBus;

[EventName("UserAdded")]
public class UserAddesEventHandler3 : JsonIntegrationEventHandler<UserData>
{
	private readonly ILogger<UserAddesEventHandler3> logger;
	public UserAddesEventHandler3(ILogger<UserAddesEventHandler3> logger)
	{
		this.logger = logger;
	}
	public override Task HandleJson(string eventName, UserData eventData)
	{
		logger.LogInformation($"Json:{eventData.UserName}");
		return Task.CompletedTask;
	}
}
public record UserData(string UserName, int Age);

Part6 – (22) Zack.EventBus源代码讲解

源代码结构

1、YouZack-Vnext/Zack.EventBus

2、RabbitMQConnection类提供的是RabbitMQ连接的失败重连机制

3、SubscriptionsManager类提供的是事件处理的注册和事件的分发机制,从而使得同样一个领域事件可以被微服务内多个事件处理者收到,SubscriptionsManager使用Dictionary来记录注册的事件处理者,其中的AddSubscription(string eventName, Type eventHandlerType)方法用来供把eventHandlerType指定的事件处理类注册为eventName事件的处理类

4、ServicesCollectionExtensions类中的AddEventBus方法用来把集成事件处理类注册到SubscriptionsManager中,它会扫描指定程序集中所有实现了IIntegrationEventHandler接口的类,然后读取类上标注的所有[EventName],把指定监听的事件注册到SubscriptionsManager中;

5、RabbitMQEventBus类用来进行事件的注册和分发

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;

namespace Zack.EventBus
{
    class RabbitMQConnection
    {
        private readonly IConnectionFactory _connectionFactory;
        private IConnection _connection;
        private bool _disposed;
        private readonly object sync_root = new object();

        public RabbitMQConnection(IConnectionFactory connectionFactory)
        {
            _connectionFactory = connectionFactory;
        }

        public bool IsConnected
        {
            get
            {
                return _connection != null && _connection.IsOpen && !_disposed;
            }
        }

        public IModel CreateModel()
        {
            if (!IsConnected)
            {
                throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");
            }

            return _connection.CreateModel();
        }

        public void Dispose()
        {
            if (_disposed) return;
            _disposed = true;
            _connection.Dispose();
        }

        public bool TryConnect()
        {
            lock (sync_root)
            {
                _connection = _connectionFactory.CreateConnection();

                if (IsConnected)
                {
                    _connection.ConnectionShutdown += OnConnectionShutdown;
                    _connection.CallbackException += OnCallbackException;
                    _connection.ConnectionBlocked += OnConnectionBlocked;
                    return true;
                }
                else
                {
                    return false;
                }
            }
        }

        private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
        {
            if (_disposed) return;
            TryConnect();
        }

        void OnCallbackException(object sender, CallbackExceptionEventArgs e)
        {
            if (_disposed) return;
            TryConnect();
        }

        void OnConnectionShutdown(object sender, ShutdownEventArgs reason)
        {
            if (_disposed) return;
            TryConnect();
        }
    }
}
using System;
using System.Collections.Generic;
using System.Linq;

namespace Zack.EventBus
{
    class SubscriptionsManager
    {
        //key是eventName,值是监听这个事件的实现了IIntegrationEventHandler接口的类型
        private readonly Dictionary<string, List<Type>> _handlers = new Dictionary<string, List<Type>>();

        public event EventHandler<string> OnEventRemoved;

        public bool IsEmpty => !_handlers.Keys.Any();
        public void Clear() => _handlers.Clear();

        /// <summary>
        /// 把eventHandlerType类型(实现了eventHandlerType接口)注册为监听了eventName事件
        /// </summary>
        /// <param name="eventName"></param>
        /// <param name="eventHandlerType"></param>
        public void AddSubscription(string eventName, Type eventHandlerType)
        {
            if (!HasSubscriptionsForEvent(eventName))
            {
                _handlers.Add(eventName, new List<Type>());
            }
            //如果已经注册过,则报错
            if (_handlers[eventName].Contains(eventHandlerType))
            {
                throw new ArgumentException($"Handler Type {eventHandlerType} already registered for '{eventName}'", nameof(eventHandlerType));
            }
            _handlers[eventName].Add(eventHandlerType);
        }

        public void RemoveSubscription(string eventName, Type handlerType)
        {
            _handlers[eventName].Remove(handlerType);
            if (!_handlers[eventName].Any())
            {
                _handlers.Remove(eventName);
                OnEventRemoved?.Invoke(this, eventName);
            }
        }

        /// <summary>
        /// 得到名字为eventName的所有监听者
        /// </summary>
        /// <param name="eventName"></param>
        /// <returns></returns>
        public IEnumerable<Type> GetHandlersForEvent(string eventName) => _handlers[eventName];

        /// <summary>
        /// 是否有类型监听eventName这个事件
        /// </summary>
        /// <param name="eventName"></param>
        /// <returns></returns>
        public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);

    }
}
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;

namespace Zack.EventBus
{
    public static class ServicesCollectionExtensions
    {
        public static IServiceCollection AddEventBus(this IServiceCollection services, string queueName,
            params Assembly[] assemblies)
        {
            return AddEventBus(services, queueName, assemblies.ToList());
        }

        public static IServiceCollection AddEventBus(this IServiceCollection services, string queueName,
            IEnumerable<Assembly> assemblies)
        {
            List<Type> eventHandlers = new List<Type>();
            foreach (var asm in assemblies)
            {
                //用GetTypes(),这样非public类也能注册
                var types = asm.GetTypes().Where(t => t.IsAbstract == false && t.IsAssignableTo(typeof(IIntegrationEventHandler)));
                eventHandlers.AddRange(types);
            }
            return AddEventBus(services, queueName, eventHandlers);
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="services"></param>
        /// <param name="queueName">如果多个消费者订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理。为了确保一个应用监听到所有的领域事件,所以不同前端项目的queueName需要不一样。
        /// 因此,对于同一个应用,这个queueName需要保证在多个集群实例和多次运行保持一致,这样可以保证应用重启后仍然能收到没来得及处理的消息。而且这样同一个应用的多个集群实例只有一个能收到一条消息,不会同一条消息被一个应用的多个实例处理。这样消息的处理就被平摊到多个实例中。
        ///</param>
        /// <param name="eventHandlerTypes"></param>
        /// <returns></returns>
        public static IServiceCollection AddEventBus(this IServiceCollection services, string queueName, IEnumerable<Type> eventHandlerTypes)
        {
            foreach (Type type in eventHandlerTypes)
            {
                services.AddScoped(type, type);
            }

            services.AddSingleton<IEventBus>(sp =>
            {
                //如果注册服务的时候就要读取配置,那么可以用AddSingleton的Func<IServiceProvider, TService> 这个重载,
                //因为可以拿到IServiceProvider,省得自己构建IServiceProvider
                var optionMQ = sp.GetRequiredService<IOptions<IntegrationEventRabbitMQOptions>>().Value;
                var factory = new ConnectionFactory()
                {
                    HostName = optionMQ.HostName,
                    DispatchConsumersAsync = true
                };
                //eventBus归DI管理,释放的时候会调用Dispose
                //eventbus的Dispose中会销毁RabbitMQConnection
                RabbitMQConnection mqConnection = new RabbitMQConnection(factory);
                var serviceScopeFactory = sp.GetRequiredService<IServiceScopeFactory>();
                var eventBus = new RabbitMQEventBus(mqConnection, serviceScopeFactory, optionMQ.ExchangeName, queueName);
                //遍历所有实现了IIntegrationEventHandler接口的类,然后把它们批量注册到eventBus
                foreach (Type type in eventHandlerTypes)
                {
                    //获取类上标注的EventNameAttribute,EventNameAttribute的Name为要监听的事件的名字
                    //允许监听多个事件,但是不能为空
                    var eventNameAttrs = type.GetCustomAttributes<EventNameAttribute>();
                    if (eventNameAttrs.Any() == false)
                    {
                        throw new ApplicationException($"There shoule be at least one EventNameAttribute on {type}");
                    }
                    foreach (var eventNameAttr in eventNameAttrs)
                    {
                        eventBus.Subscribe(eventNameAttr.Name, type);
                    }
                }
                return eventBus;
            });
            return services;
        }
    }
}
using Microsoft.AspNetCore.Builder;
using System;

namespace Zack.EventBus
{
    public static class ApplicationBuilderExtensions
    {
        public static IApplicationBuilder UseEventBus(this IApplicationBuilder appBuilder)
        {
            //获得IEventBus一次,就会立即加载IEventBus,这样扫描所有的EventHandler,保证消息及时消费
            object? eventBus = appBuilder.ApplicationServices.GetService(typeof(IEventBus));
            if (eventBus == null)
            {
                throw new ApplicationException("找不到IEventBus实例");
            }
            return appBuilder;
        }
    }
}
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Diagnostics;
using System.Reflection;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;

namespace Zack.EventBus;
class RabbitMQEventBus : IEventBus, IDisposable
{
    private IModel _consumerChannel;
    private readonly string _exchangeName;
    private string _queueName;
    private readonly RabbitMQConnection _persistentConnection;
    private readonly SubscriptionsManager _subsManager;
    private readonly IServiceProvider _serviceProvider;
    private readonly IServiceScope serviceScope;

    public RabbitMQEventBus(RabbitMQConnection persistentConnection,
        IServiceScopeFactory serviceProviderFactory, string exchangeName, string queueName)
    {
        this._persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));
        this._subsManager = new SubscriptionsManager();
        this._exchangeName = exchangeName;
        this._queueName = queueName;

        //因为RabbitMQEventBus是Singleton对象,而它创建的IIntegrationEventHandler以及用到的IIntegrationEventHandler用到的服务
        //大部分是Scoped,因此必须这样显式创建一个scope,否则在getservice的时候会报错:Cannot resolvefrom root provider because it requires scoped service
        this.serviceScope = serviceProviderFactory.CreateScope();
        this._serviceProvider = serviceScope.ServiceProvider;
        this._consumerChannel = CreateConsumerChannel();
        this._subsManager.OnEventRemoved += SubsManager_OnEventRemoved; ;
    }

    private void SubsManager_OnEventRemoved(object? sender, string eventName)
    {
        if (!_persistentConnection.IsConnected)
        {
            _persistentConnection.TryConnect();
        }

        using (var channel = _persistentConnection.CreateModel())
        {
            channel.QueueUnbind(queue: _queueName,
                exchange: _exchangeName,
                routingKey: eventName);

            if (_subsManager.IsEmpty)
            {
                _queueName = string.Empty;
                _consumerChannel.Close();
            }
        }
    }

    public void Publish(string eventName, object? eventData)
    {
        if (!_persistentConnection.IsConnected)
        {
            _persistentConnection.TryConnect();
        }
        //Channel 是建立在 Connection 上的虚拟连接
        //创建和销毁 TCP 连接的代价非常高,
        //Connection 可以创建多个 Channel ,Channel 不是线程安全的所以不能在线程间共享。
        using (var channel = _persistentConnection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: _exchangeName, type: "direct");

            byte[] body;
            if (eventData == null)
            {
                body = new byte[0];
            }
            else
            {
                JsonSerializerOptions options = new JsonSerializerOptions
                {
                    WriteIndented = true
                };
                body = JsonSerializer.SerializeToUtf8Bytes(eventData, eventData.GetType(), options);
            }
            var properties = channel.CreateBasicProperties();
            properties.DeliveryMode = 2; // persistent
            channel.BasicPublish(
                exchange: _exchangeName,
                routingKey: eventName,
                mandatory: true,
                basicProperties: properties,
                body: body);
        }
    }

    public void Subscribe(string eventName, Type handlerType)
    {
        CheckHandlerType(handlerType);
        DoInternalSubscription(eventName);
        _subsManager.AddSubscription(eventName, handlerType);
        StartBasicConsume();
    }

    private void DoInternalSubscription(string eventName)
    {
        var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
        if (!containsKey)
        {
            if (!_persistentConnection.IsConnected)
            {
                _persistentConnection.TryConnect();
            }
            _consumerChannel.QueueBind(queue: _queueName,
                                exchange: _exchangeName,
                                routingKey: eventName);
        }
    }

    private void CheckHandlerType(Type handlerType)
    {
        if (!typeof(IIntegrationEventHandler).IsAssignableFrom(handlerType))
        {
            throw new ArgumentException($"{handlerType} doesn't inherit from IIntegrationEventHandler", nameof(handlerType));
        }
    }

    public void Unsubscribe(string eventName, Type handlerType)
    {
        CheckHandlerType(handlerType);
        _subsManager.RemoveSubscription(eventName, handlerType);
    }

    public void Dispose()
    {
        if (_consumerChannel != null)
        {
            _consumerChannel.Dispose();
        }
        _subsManager.Clear();
        this._persistentConnection.Dispose();
        this.serviceScope.Dispose();
    }

    private void StartBasicConsume()
    {
        if (_consumerChannel != null)
        {
            var consumer = new AsyncEventingBasicConsumer(_consumerChannel);
            consumer.Received += Consumer_Received;
            _consumerChannel.BasicConsume(
                queue: _queueName,
                autoAck: false,
                consumer: consumer);
        }
    }

    private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs)
    {
        var eventName = eventArgs.RoutingKey;//这个框架中,就是用eventName当RoutingKey
        var message = Encoding.UTF8.GetString(eventArgs.Body.Span);//框架要求所有的消息都是字符串的json
        try
        {
            await ProcessEvent(eventName, message);
            //如果在获取消息时采用不自动应答,但是获取消息后不调用basicAck,
            //RabbitMQ会认为消息没有投递成功,不仅所有的消息都会保留到内存中,
            //而且在客户重新连接后,会将消息重新投递一遍。这种情况无法完全避免,因此EventHandler的代码需要幂等
            _consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);
            //multiple:批量确认标志。如果值为true,则执行批量确认,此deliveryTag之前收到的消息全部进行确认; 如果值为false,则只对当前收到的消息进行确认
        }
        catch (Exception ex)
        {
            //requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
            //_consumerChannel.BasicReject(eventArgs.DeliveryTag, true);
            Debug.Fail(ex.ToString());
        }
    }

    private IModel CreateConsumerChannel()
    {
        if (!_persistentConnection.IsConnected)
        {
            _persistentConnection.TryConnect();
        }

        var channel = _persistentConnection.CreateModel();
        channel.ExchangeDeclare(exchange: _exchangeName,
                                type: "direct");

        channel.QueueDeclare(queue: _queueName,
                             durable: true,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);

        channel.CallbackException += (sender, ea) =>
        {
            /*
            _consumerChannel.Dispose();
            _consumerChannel = CreateConsumerChannel();
            StartBasicConsume();*/
            Debug.Fail(ea.ToString());
        };

        return channel;
    }

    private async Task ProcessEvent(string eventName, string message)
    {
        if (_subsManager.HasSubscriptionsForEvent(eventName))
        {
            var subscriptions = _subsManager.GetHandlersForEvent(eventName);
            foreach (var subscription in subscriptions)
            {
                //各自在不同的Scope中,避免DbContext等的共享造成如下问题:
                //The instance of entity type cannot be tracked because another instance
                using var scope = this._serviceProvider.CreateScope();
                IIntegrationEventHandler? handler = scope.ServiceProvider.GetService(subscription) as IIntegrationEventHandler;
                if (handler == null)
                {
                    throw new ApplicationException($"无法创建{subscription}类型的服务");
                }
                await handler.Handle(eventName, message);
            }
        }
        else
        {
            string entryAsm = Assembly.GetEntryAssembly().GetName().Name;
            Debug.WriteLine($"找不到可以处理eventName={eventName}的处理程序,entryAsm:{entryAsm}");
        }
    }
}
using System.Text.Json;
using System.Threading.Tasks;
namespace Zack.EventBus
{
    public abstract class JsonIntegrationEventHandler<T> : IIntegrationEventHandler
    {
        public Task Handle(string eventName, string json)
        {
            T? eventData = JsonSerializer.Deserialize<T>(json);
            return HandleJson(eventName, eventData);
        }
        public abstract Task HandleJson(string eventName, T? eventData);
    }
}
namespace Zack.EventBus
{
    public class IntegrationEventRabbitMQOptions
    {
        public string HostName { get; set; }
        public string ExchangeName { get; set; }
    }
}
using System.Threading.Tasks;

namespace Zack.EventBus
{
    public interface IIntegrationEventHandler
    {
        //因为消息可能会重复发送,因此Handle内的实现需要是幂等的
        Task Handle(string eventName, string eventData);
    }
}
using System;

namespace Zack.EventBus
{
    public interface IEventBus
    {
        void Publish(string eventName, object? eventData);
        void Subscribe(string eventName, Type handlerType);
        void Unsubscribe(string eventName, Type handlerType);
    }
}
using System;

namespace Zack.EventBus
{
    [AttributeUsage(AttributeTargets.Class, AllowMultiple = true)]
    public class EventNameAttribute : Attribute
    {
        public EventNameAttribute(string name)
        {
            this.Name = name;
        }
        public string Name { get; init; }
    }
}
using Dynamic.Json;
using System.Threading.Tasks;

namespace Zack.EventBus
{
    public abstract class DynamicIntegrationEventHandler : IIntegrationEventHandler
    {
        public Task Handle(string eventName, string eventData)
        {
            //https://github.com/dotnet/runtime/issues/53195
            //https://github.com/dotnet/core/issues/6444
            //.NET 6目前不支持把json反序列化为dynamic,本来preview 4支持,但是在preview 7又去掉了
            //所以暂时用Dynamic.Json来实现。
            dynamic dynamicEventData = DJson.Parse(eventData);
            return HandleDynamic(eventName, dynamicEventData);
        }

        public abstract Task HandleDynamic(string eventName, dynamic eventData);
    }
}

本文版权归个人技术分享站点所有,发布者:chaoqiang,转转请注明出处:http://www.zhengchaoqiang.com/1669.html

(2)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
chaoqiangchaoqiang
上一篇 2022-04-04 19:12
下一篇 2022-04-05 11:05

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

近期个人博客正在迁移中,原博客请移步此处,抱歉!