欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

[Abp 源码分析]九、事件总线

程序员文章站 2022-07-01 14:48:43
0.简介 事件总线就是订阅/发布模式的一种实现,本质上事件总线的存在是为了降低耦合而存在的。 从上图可以看到事件由发布者发布到事件总线处理器当中,然后经由事件总线处理器调用订阅者的处理方法,而发布者和订阅者之间并没有耦合关系。 像 Windows 本身的设计也是基于事件驱动,当用户点击了某个按钮,那 ......

0.简介

事件总线就是订阅/发布模式的一种实现,本质上事件总线的存在是为了降低耦合而存在的。

[Abp 源码分析]九、事件总线

从上图可以看到事件由发布者发布到事件总线处理器当中,然后经由事件总线处理器调用订阅者的处理方法,而发布者和订阅者之间并没有耦合关系。

像 Windows 本身的设计也是基于事件驱动,当用户点击了某个按钮,那么就会触发相应的按钮点击事件,而程序只需要监听这个按钮点击事件即可进行相应的处理,而事件被触发的时候往往都会附带相应的事件源,事件所产生的数据等。

还是以按钮被点击为例,该事件被触发的时候会装填上触发时间,被谁触发的数据放在一个 EventArgs 内部,然后将其存放到事件处理器中,然后处理器根据事件的类型去查找订阅了该事件类型的对象,附带上事件数据去调用这些订阅者对象的处理方法。

Abp 本身也实现了事件总线,并且在框架内部也实现了丰富的事件类型,例如实体更新事件、异常事件等等。

注意:在下文当中处理器的含义等同于订阅者,请阅读的时候自行切换。

0.1.使用方法

在引用了 Abp 框架的项目当中使用事件总线相当简单,只需要直接注入 IEventBus 即可触发相应的事件。如果你想要监听某个事件,并且你也想在事件被触发的时候进行处理,那么直接继承自 IEventHandler<TEventData> / IAsyncEventHandler<TEventData> 实现其接口方法 HandleEvent() 即可。

比如说,我们首先定义了一个 TestEventData 的事件,如下:

/// <summary>
/// 测试事件
/// </summary>
public class TestEventData : EventData
{
    public TestEventData(string code)
    {
        Code = code;
    }

    /// <summary>
    /// 待验证的编码
    /// </summary>
    public string Code { get; }
}

很简单,这个事件触发的时候会传递一个 string 类型的 Code 参数。

之后我们使用 TestEventHandler 订阅这个事件,当然订阅的方式很简单,实现接口即可。

public class TestEventHandler : IAsyncEventHandler<TestEventData>, IEventHandler<TestEventData>
{
    public Task HandleEventAsync(TestEventData eventData)
    {
        if (eventData.Code == "1") Console.WriteLine("# 异步测试,编码正确");

        Console.WriteLine("# 异步测试,编码错误");
        return Task.FromResult(0);
    }

    public void HandleEvent(TestEventData eventData)
    {
        if (eventData.Code == "1") Console.WriteLine("# 同步测试,编码正确");

        Console.WriteLine("# 同步测试,编码错误");
    }
}

Abp 在底层会扫描实现了 IEventHandler<TEventData> / IAsyncEventHandler<TEventData> 这两个接口的类型,将其自动注册为订阅者。

当然你也可以手动订阅:

public class TestAppService : ApplicationService
{
    private readonly IEventBus _eventBus;

    public TestAppService(IEventBus eventBus)
    {
        _eventBus = eventBus;
    }

    public async Task TestMethod()
    {
        // 同步触发
        _eventBus.Trigger(new TestEventData("Code1"));
        // 异步触发,3.6.x 版本新增的
        await _eventBus.TriggerAsync(new TestEventData("Code1"));

        // 手动注册事件范例
        _eventBus.Register<TestEventData, TestEventHandler>();
    }
}

这里的 Register() 方法会让你传入一个事件数据类型,以及该事件对应的处理器。

同一个事件可以被多个对象所订阅,只要该对象实现 IEventHandler<TEventData> / IAsyncEventHandler<TEventData> 接口或者是显式地被 IEventBus.Register() 注册,他们都会在事件被触发的时候调用。

2.启动流程

按照惯例我们来分析一下 Abp 针对事件总线的实现,看一下它的整体启动流程,什么时候被注入,被初始化。

事件总线比起其他 Abp 基础设施来说他的注册点就一个,在 EventBusInstaller 里面,包含了针对 IEventBus 的注册以及对实现了 IEventHandler 处理器的注册。
EventBusInstaller 在 Abp 框架的核心模块 AbpKernelModuleInitialize 被注册调用:

public override void Initialize()
{
    // ...其他代码

    IocManager.IocContainer.Install(new EventBusInstaller(IocManager));

    // ... 其他代码
}

里面的 Install() 方法做了两个动作,第一是根据事件总线配置来决定 IEventBus 的注册方式,第二则是将订阅者(事件处理器)通过 IEventBus.Register() 方法自动放到事件总线管理器里面。

public void Install(IWindsorContainer container, IConfigurationStore store)
{
    // 这里是注入的配置类
    if (_eventBusConfiguration.UseDefaultEventBus)
    {
        container.Register(
            Component.For<IEventBus>().UsingFactoryMethod(() => EventBus.Default).LifestyleSingleton()
            );
    }
    else
    {
        container.Register(
            Component.For<IEventBus>().ImplementedBy<EventBus>().LifestyleSingleton()
            );
    }

    // 解析事件总线管理器
    _eventBus = container.Resolve<IEventBus>();

    // 注册订阅者对象
    container.Kernel.ComponentRegistered += Kernel_ComponentRegistered;
}

Emmmm,具体的代码分析请看下面:

private void Kernel_ComponentRegistered(string key, IHandler handler)
{
    // 判断当前注入的对象是否实现了 IEventHandler 接口,没有实现则跳过
    if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(handler.ComponentModel.Implementation))
    {
        return;
    }

    // 获得当前注入对象实现的所有接口
    var interfaces = handler.ComponentModel.Implementation.GetTypeInfo().GetInterfaces();
    // 遍历获取到的所有接口
    foreach (var @interface in interfaces)
    {
        // 如果当前被遍历的接口类型不是 IEventHandler 或者不是从 IEventHandler 继承的,则跳过
        if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
        {
            continue;
        }

        // 到这里获取这个 IEventHandler 处理器的泛型参数
        var genericArgs = @interface.GetGenericArguments();
        // 并且它的泛型参数应该只有一个
        if (genericArgs.Length == 1)
        {
            // 根据 IEventHandler 的定义,拿到的泛型参数肯定就是事件数据类型啦
            // 第二个参数就是一个 Handler 的工厂咯,每次触发事件的时候都会从这个
            // 工厂解析出具体的事件处理器来响应事件的操作。
            _eventBus.Register(genericArgs[0], new IocHandlerFactory(_iocResolver, handler.ComponentModel.Implementation));
        }
    }
}

目前看来还是十分简单的。

3.代码分析

3.1 事件总线管理器

整个事件总线的核心就是这个管理器(IEventBus/EventBus),事件的注册,事件的触发,所有这些东西都是由它来提供的,其实嘛事件总线没你想象得那么复杂。

它的基本原理很简单,就是用户向事件总线管理器注册我想要触发的事件,还有响应我事件的订阅者,将其放在一个字典里面。当 A 对象在数据库 断开连接 的时候,通过事件总线管理器触发 断开连接事件,事件总线管理器就会从之前注册的字典,根据触发时候传递的类型拿到响应的处理器集合,遍历这个集合调用对应的方法。

说这么多,我们来看一下代码吧,首先看看事件总线管理器的定义(当然接口太多,这里是精简过的):

public interface IEventBus
{
    // 注册并订阅事件
    IDisposable Register(Type eventType, IEventHandlerFactory factory);
    // 这里没有异步注册的原因是它最后还是会调用上面这个方法

    // 取消事件的订阅,这里传入的是一个 Action
    void Unregister<TEventData>(Action<TEventData> action) where TEventData : IEventData;
    // 这里传入的是一个 IEventHandler
    void Unregister<TEventData>(IEventHandler<TEventData> handler) where TEventData : IEventData;
    
    // 异步取消事件订阅
    void AsyncUnregister<TEventData>(Func<TEventData, Task> action) where TEventData : IEventData;

    // 同样是取消事件订阅
    void Unregister(Type eventType, IEventHandler handler);
    void Unregister(Type eventType, IEventHandlerFactory factory);
    void UnregisterAll(Type eventType);

    // 触发事件
    void Trigger(Type eventType, object eventSource, IEventData eventData);
    // 异步触发事件
    Task TriggerAsync(Type eventType, object eventSource, IEventData eventData);
}

Emm,看了一下,大概就分为三类,注册事件取消事件的订阅触发事件,其他定义的接口大多都是不同形式的重载,本质上还是会调用到上述方法的。

首先在事件总线管理器内部有一个字典,这个字典就是我们刚才所提到的事件总线维护的事件字典,大概长这个样子:

private readonly ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories;

可以看到,这个字典的 Key 是一个 Type 类型,其实就是我们所注册的事件类型罢了,后面那个呢就是事件处理器的工厂。那为什么这个工厂会用一个 List 来存储呢?

原因有两点:

  1. 因为我们对应的事件处理器的生命周期与生成方式都有所不同,比如说 Abp 它自己本身就提供了IocHandlerFactoryTransientEventHandlerFactorySingleInstanceHandlerFactory 这三种实现。
  2. 因为一个事件可能会被多个处理器所订阅,那么一个处理器拥有一个工厂,所以会是一个集合。

3.1.1 注册事件

在默认的 Register() 方法里面就是使用的 IocHandlerFactory 来进行注册事件的,如果你需要手动注册事件呢,可以使用签名为:

public IDisposable Register(Type eventType, IEventHandlerFactory factory);

的方法,来传入自己实现的处理器工厂或者是 Abp 提供的事件处理器工厂。

看了它的定义之后,我们来看一下它的具体实现,首先来看看注册事件的 Register() 方法:

public IDisposable Register(Type eventType, IEventHandlerFactory factory)
{
    // 获得指定事件类型的工厂集合,然后往这个集合添加一个事件处理器工厂
    GetOrCreateHandlerFactories(eventType)
        .Locking(factories => factories.Add(factory));

    // Emm,这里面就是一个 Dispose 方法,用于释放创建好的工厂对象,里面的 Dispose 方法
    // 最终会调用 IEventBus 的 UnRegister 方法来卸载工厂
    return new FactoryUnregistrar(this, eventType, factory);
}

private List<IEventHandlerFactory> GetOrCreateHandlerFactories(Type eventType)
{
    // 根据事件类型创建/获取一个事件处理器工厂集合
    return _handlerFactories.GetOrAdd(eventType, (type) => new List<IEventHandlerFactory>());
}

可以看到调用了注册方法之后,它返回了一个 FactoryUnregistrar ,查看它的定义如下:

internal class FactoryUnregistrar : IDisposable
{
    private readonly IEventBus _eventBus;
    private readonly Type _eventType;
    private readonly IEventHandlerFactory _factory;

    public FactoryUnregistrar(IEventBus eventBus, Type eventType, IEventHandlerFactory factory)
    {
        _eventBus = eventBus;
        _eventType = eventType;
        _factory = factory;
    }

    public void Dispose()
    {
        _eventBus.Unregister(_eventType, _factory);
    }
}

很简单的一个类,重点就是在 Dispose() 内部调用了 IEventBusUnregister() 方法,下面就会讲解这东西。

3.1.2 取消事件的订阅

接着是 UnRegister() 方法,UnRegister 方法有很多个,一般分为两类,第一是取消订阅,第二就是卸载工厂。

public void Unregister<TEventData>(Action<TEventData> action) where TEventData : IEventData
{
    // 确保不为空
    Check.NotNull(action, nameof(action));

    // 根据类型得到该类型所有的事件处理器集合
    GetOrCreateHandlerFactories(typeof(TEventData))
        // 使用 lock 加锁,防止线程同步问题
        .Locking(factories =>
        {
            // 调用 List 的 RemoveAll() 方法清除指定条件的工厂
            factories.RemoveAll(
                factory =>
                {
                    // 判断工厂是否为单例工厂
                    var singleInstanceFactory = factory as SingleInstanceHandlerFactory;
                    // 如果不是单例工厂则不移除
                    if (singleInstanceFactory == null)
                    {
                        return false;
                    }
                    
                    // 如果是单例工厂,拿到其内部的具体事件处理器,并强制换为 ActionEventHandler
                    var actionHandler = singleInstanceFactory.HandlerInstance as ActionEventHandler<TEventData>;
                    // 为空的话,不移除
                    if (actionHandler == null)
                    {
                        return false;
                    }

                   // 判断传入的处理逻辑是否与事件处理器逻辑相等,相等则移除
                    return actionHandler.Action == action;
                });
        });
}

// 取消订阅的另一种实现,只是针对 SingleInstanceHandlerFactory 进行了处理
public void Unregister(Type eventType, IEventHandler handler)
{
    GetOrCreateHandlerFactories(eventType)
        .Locking(factories =>
                    {
                        factories.RemoveAll(
                            factory =>
                            factory is SingleInstanceHandlerFactory &&
                            (factory as SingleInstanceHandlerFactory).HandlerInstance == handler
                        );
                    });
}

// 第二种情况,卸载工厂,也就是 Register() 之后返回的 FactoryUnregistrar 释放时调用的方法
public void Unregister(Type eventType, IEventHandlerFactory factory)
{
    // 根据传入的类型,获得事件处理器工厂集合,移除相应工厂
    GetOrCreateHandlerFactories(eventType).Locking(factories => factories.Remove(factory));
}

在上方代码可以看到,似乎 Unregister() 方法只针对 SingleInstanceHandlerFactory 工厂进行了处理,而没有处理 IocHandlerFactoryTransientEventHandlerFactory

这是因为在 IEventBus 当中实现了这两个方法:

IDisposable Register(Type eventType, IEventHandler handler);
IDisposable Register<TEventData>(Action<TEventData> action) where TEventData : IEventData;

可以看到这两个方法都没有传入工厂,第一个允许你传入一个事件处理器对象,第二个则是让你传入一个 Action 作为其事件订阅者。

看看实现:

public IDisposable Register<TEventData>(Action<TEventData> action) where TEventData : IEventData
{
    // new 了一个 ActionEventHandler 作为处理器
    return Register(typeof(TEventData), new ActionEventHandler<TEventData>(action));
}

public IDisposable Register<TEventData>(IEventHandler<TEventData> handler) where TEventData : IEventData
{
    // 传入具体的处理器对象进行注册
    return Register(typeof(TEventData), handler);
}

public IDisposable Register(Type eventType, IEventHandler handler)
{
    // 使用 SingleInstanceHandlerFactory 工厂进行注册。
    return Register(eventType, new SingleInstanceHandlerFactory(handler));
}

因为单例工厂与其他两个工厂不一样,单例工厂的生命周期贯穿整个程序的生命周期,也就是说除非程序被结束,那么单例工厂内部的事件处理器就会一直存在,所以在 UnRegister() 方法内部只会针对 SingleInstanceHandlerFactory 工厂进行处理。

TransientEventHandlerFactory

IocHandlerFactory 工厂产生的对象的生命周期是随着具体类型在被注入时候的生命周期所决定,有可能是瞬时对象,也有可能是单例对象,下文会详细解说。

3.1.3 触发事件

当事件的发布者需要发布(触发)一个事件的时候,会调用 IEventBus 提供的 Trigger()/TriggerAsync() 方法。

然后事件总线管理器从自己的字典内匹配对应的事件,得到对应的事件处理器工厂集合,然后呢使用工厂产生具体的处理器对象,调用其 HandleEvent / HandleEventAsync 方法,执行完成之后释放对象。

public void Trigger(Type eventType, object eventSource, IEventData eventData)
{
    // 异常集合
    var exceptions = new List<Exception>();

    eventData.EventSource = eventSource;

    // 获得所有需要触发的处理器工厂,遍历传入的事件类型以及他的子类事件
    foreach (var handlerFactories in GetHandlerFactories(eventType))
    {
        // 遍历事件类型绑定的工厂集合
        foreach (var handlerFactory in handlerFactories.EventHandlerFactories)
        {
            // 获得处理器类型
            var handlerType = handlerFactory.GetHandlerType();

            // 如果是异步处理器,以同步方式运行
            if (IsAsyncEventHandler(handlerType))
            {
                AsyncHelper.RunSync(() => TriggerAsyncHandlingException(handlerFactory, handlerFactories.EventType, eventData, exceptions));
            }
            else if (IsEventHandler(handlerType))
            {
                // 调用处理器的处理方法,并回收异常信息
                TriggerHandlingException(handlerFactory, handlerFactories.EventType, eventData, exceptions);
            }
            else
            {
                // 说明这个事件没有对应的处理器实现,抛出异常
                var message = $"Event handler to register for event type {eventType.Name} does not implement IEventHandler<{eventType.Name}> or IAsyncEventHandler<{eventType.Name}> interface!";
                exceptions.Add(new AbpException(message));
            }
        }
    }

    // 处理继承事件的情况
    if (eventType.GetTypeInfo().IsGenericType &&
        eventType.GetGenericArguments().Length == 1 &&
        typeof(IEventDataWithInheritableGenericArgument).IsAssignableFrom(eventType))
    {
        var genericArg = eventType.GetGenericArguments()[0];
        var baseArg = genericArg.GetTypeInfo().BaseType;
        if (baseArg != null)
        {
            var baseEventType = eventType.GetGenericTypeDefinition().MakeGenericType(baseArg);
            var constructorArgs = ((IEventDataWithInheritableGenericArgument)eventData).GetConstructorArgs();
            var baseEventData = (IEventData)Activator.CreateInstance(baseEventType, constructorArgs);
            baseEventData.EventTime = eventData.EventTime;
            Trigger(baseEventType, eventData.EventSource, baseEventData);
        }
    }

    if (exceptions.Any())
    {
        // 如果产生的异常数量为 1 个的话,重新抛出具体的异常信息
        if (exceptions.Count == 1)
        {
            exceptions[0].ReThrow();
        }

        // 如果在执行过程中产生了多个异常,将异常集合放在内部异常当中并抛出
        throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions);
    }
}

// 筛选所有需要触发的事件类型,并将其封装为 EventTypeWithEventHandlerFactories
private IEnumerable<EventTypeWithEventHandlerFactories> GetHandlerFactories(Type eventType)
{
    var handlerFactoryList = new List<EventTypeWithEventHandlerFactories>();

    foreach (var handlerFactory in _handlerFactories.Where(hf => ShouldTriggerEventForHandler(eventType, hf.Key)))
    {
        handlerFactoryList.Add(new EventTypeWithEventHandlerFactories(handlerFactory.Key, handlerFactory.Value));
    }

    return handlerFactoryList.ToArray();
}

// 判断传入的类型是否是事件类型的子类
private static bool ShouldTriggerEventForHandler(Type eventType, Type handlerType)
{
    if (handlerType == eventType)
    {
        return true;
    }

    if (handlerType.IsAssignableFrom(eventType))
    {
        return true;
    }

    return false;
}

// 拿着具体的处理器工厂去执行处理器的处理方法
private void TriggerHandlingException(IEventHandlerFactory handlerFactory, Type eventType, IEventData eventData, List<Exception> exceptions)
{
    // 获得一个新鲜的处理器对象
    var eventHandler = handlerFactory.GetHandler();
    try
    {
        if (eventHandler == null)
        {
            throw new ArgumentNullException($"Registered event handler for event type {eventType.Name} is null!");
        }

        var handlerType = typeof(IEventHandler<>).MakeGenericType(eventType);

        // 从这个处理器获取到处理方法
        var method = handlerType.GetMethod(
            "HandleEvent",
            new[] { eventType }
        );

        // 调用处理方法,并传入事件数据
        method.Invoke(eventHandler, new object[] { eventData });
    }
    // 产生异常进行处理
    catch (TargetInvocationException ex)
    {
        exceptions.Add(ex.InnerException);
    }
    catch (Exception ex)
    {
        exceptions.Add(ex);
    }
    finally
    {
        // 释放资源
        handlerFactory.ReleaseHandler(eventHandler);
    }
}

3.2 处理器工厂

所有事件所对应的处理器对象都是由工厂所创建的,当一个事件被触发,事件总线管理器就会从事件类型所对应的工厂产生一个相应的处理器对象执行调用。

简而言之,每个事件处理器都拥有一个单独的工厂。

其接口定义如下:

public interface IEventHandlerFactory
{
    // 获得一个事件处理器对象
    IEventHandler GetHandler();

    // 获得当前工厂所产生的处理器类型
    Type GetHandlerType();

    // 释放指定的处理器对象
    void ReleaseHandler(IEventHandler handler);
}
具体实现 生命周期 描述
TransientEventHandlerFactory 瞬时 工厂产生的事件处理器生命周期是瞬时的,是一个标准
的可以被 GC 回收的对象。
SingleInstanceHandlerFactory 单例 该工厂产生的对象都会被保存在一个 Instance 内部,每
次生成对象的时候都会使用该 Instance 的值。
IocHandlerFactory 由类型注册时决定 在使用 IocHandlerFactory 的时候,会传入事件处理
器,该工厂在创建事件处理器对象的时候会从 Ioc 容器当中
解析对应的对象出来,而该对象的生命周期取决于注册时
的定义。

4.扩展

4.1 实体更新事件

Abp 在仓储每次执行 CRUD 操作的时候都会自动触发响应的实体更新事件,这些事件的触发都存放在 EntityChangeEventHelper 类当中,一共有以下几个事件,你订阅该这些事件之后就会在实体产生更改的时候被触发。

  • EntityChangedEventData<TEntity> 实体被更改的时候触发。

  • EntityCreatedEventData<TEntity> 实体创建完成后触发。
  • EntityCreatingEventData<TEntity> 实体创建时被触发。
  • EntityDeletedEventData<TEntity> 实体删除完成后触发。
  • EntityDeletingEventData<TEntity> 实体删除时被触发。
  • EntityUpdatedEventData<TEntity> 实体更新后触发。
  • EntityUpdatingEventData<TEntity> 实体更新时被触发。

public class TestHandler : IEventHandler<EntityChangedEventData<TestEntity>>
{
    public void HandleEvent(EntityChangedEventData<TestEntity> eventData)
    {
        Console.WriteLine($"测试实体,ID为 {eventDate.Entity.Id} 被更改了");
    }
}

4.2 异常事件

Abp 在运行期间遇到的异常也会自动触发异常事件,其类型为 AbpHandledExceptionDataExceptionData