欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

程序员文章站 2022-05-18 15:37:33
一、简要介绍 ABP vNext 封装了两种事件总线结构,第一种是 ABP vNext 自己实现的本地事件总线,这种事件总线无法跨项目发布和订阅。第二种则是分布式事件总线,ABP vNext 自己封装了一个抽象层进行定义,并使用 RabbitMQ 编写了一个基本实现。 在使用方式上,两种事件总线的作 ......

一、简要介绍

abp vnext 封装了两种事件总线结构,第一种是 abp vnext 自己实现的本地事件总线,这种事件总线无法跨项目发布和订阅。第二种则是分布式事件总线,abp vnext 自己封装了一个抽象层进行定义,并使用 rabbitmq 编写了一个基本实现。

在使用方式上,两种事件总线的作用基本相同。

事件总线分布在两个模块,在 volo.abp.eventbus 模块内部,定义了事件总线的抽象接口,以及本地事件总线 (ilocaleventbus) 的实现。分布式事件总线的具体实现,是在 volo.abp.eventbus.rabbitmq 模块内部进行定义,从项目名称可以看出来,这个模块是基于 rabbitmq 消息队列实现的。

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

但是该项目并不是直接引用 rabbitmq.client 包,而是在 volo.abp.rabbitmq 项目内部引用。这是因为除了分布式事件总线以外,abp 还基于 rabbitmq 实现了一个后台作业管理器。

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

abp vnext 框架便将一些对象抽象出来,放在 volo.abp.rabbitmq 项目内部进行定义和实现。

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

二、源码分析

2.1 事件处理器的注册

分析源码,首先从一个项目的模块开始,volo.abp.eventbus 库的模块 abpeventbusmodule 只干了一件事情。在组件注册的时候,根据组件的实现接口 (ilocaleventhandleridistributedeventhandler) 不同,将其赋值给 abplocaleventbusoptionsabpdistributedeventbusoptionshandlers 属性。

也就是说,开发人员定义的事件处理程序 (handler) 都会在依赖注入的时候,都会将其类型 (type) 添加到事件总线的配置类当中,方便后续进行使用。

2.2 事件总线的接口

通过事件总线模块的单元测试我们可以知道,事件的发布与订阅都是通过 ieventbus 的两个子接口 (ilocaleventbus/idistributedeventbus) 进行的。在 ieventbus 接口的定义中,有三种行为,分别是 发布订阅取消订阅

对于 ilocaleventbus 接口和 idistributedeventbus 接口来说,它们都提供了一个,针对本地事件处理器和分布式处理器的特殊订阅方法。

ilocaleventbus

/// <summary>
/// defines interface of the event bus.
/// </summary>
public interface ilocaleventbus : ieventbus
{
    /// <summary>
    /// registers to an event. 
    /// same (given) instance of the handler is used for all event occurrences.
    /// </summary>
    /// <typeparam name="tevent">event type</typeparam>
    /// <param name="handler">object to handle the event</param>
    idisposable subscribe<tevent>(ilocaleventhandler<tevent> handler)
        where tevent : class;
}

idistributedeventbus

public interface idistributedeventbus : ieventbus
{
    /// <summary>
    /// registers to an event. 
    /// same (given) instance of the handler is used for all event occurrences.
    /// </summary>
    /// <typeparam name="tevent">event type</typeparam>
    /// <param name="handler">object to handle the event</param>
    idisposable subscribe<tevent>(idistributedeventhandler<tevent> handler)
        where tevent : class;
}

2.3 事件总线基本流程和实现

同其他模块一样,因为有分布式事件总线和本地事件总线,abp vnext 同样抽象了一个 eventbusbase 类型,作为它们的基类实现。

一般的流程,我们是先定义某个事件,然后订阅该事件并指定事件处理器,最后在某个时刻发布事件。例如下面的代码:

首先定义了一个事件处理器,专门用于处理 entitychangedeventdata<myentity> 事件。

public class myeventhandler : ilocaleventhandler<entitychangedeventdata<myentity>>
{
    public int entitychangedeventcount { get; set; }

    public task handleeventasync(entitychangedeventdata<myentity> eventdata)
    {
        entitychangedeventcount++;
        return task.completedtask;
    }
}
var handler = new myeventhandler();

localeventbus.subscribe<entitychangedeventdata<myentity>>(handler);

await localeventbus.publishasync(new entitycreatedeventdata<myentity>(new myentity()));

2.3.1 事件的订阅

可以看到,这里使用的是 ilocaleventbus 定义的订阅方法,跳转到内部实现,它还是调用的 eventbus 的方法。

public virtual idisposable subscribe<tevent>(ilocaleventhandler<tevent> handler) where tevent : class
{
    // 调用基类的 subscribe 方法,并传递 tevent 的类型,和事件处理器。
    return subscribe(typeof(tevent), handler);
}
public virtual idisposable subscribe(type eventtype, ieventhandler handler)
{
    return subscribe(eventtype, new singleinstancehandlerfactory(handler));
}

可以看到,这里传递了一个 singleinstancehandlerfactory 对象,这玩意儿是干嘛用的呢?从名字可以看出来,这是一个工厂,是用来创建 handler (事件处理器) 的工厂,并且是一个单实例的事件处理器工厂。

下面就是 ieventhandlerfactory 接口的定义,以及 singleinstancehandlerfactory 实现。

public interface ieventhandlerfactory
{
    // 获得一个事件处理器包装对象,即事件处理器执行完毕之后,可以调用
    // ieventhandlerdisposewrapper.dispose() 进行释放。
    ieventhandlerdisposewrapper gethandler();

    // 判断在已有的事件处理器工厂集合中,是否已经存在了相同的事件处理器。
    bool isinfactories(list<ieventhandlerfactory> handlerfactories);
}

public class singleinstancehandlerfactory : ieventhandlerfactory
{
    // 构造工厂时,传递的事件处理器实例。
    public ieventhandler handlerinstance { get; }


    public singleinstancehandlerfactory(ieventhandler handler)
    {
        handlerinstance = handler;
    }

    // 通过 eventhandlerdisposewrapper 包装事件处理器实例。
    public ieventhandlerdisposewrapper gethandler()
    {
        return new eventhandlerdisposewrapper(handlerinstance);
    }

    // 判断针对 handlerinstance 的事件处理器是否已经存在。
    public bool isinfactories(list<ieventhandlerfactory> handlerfactories)
    {
        return handlerfactories
            .oftype<singleinstancehandlerfactory>()
            .any(f => f.handlerinstance == handlerinstance);
    }
}

针对 ieventhandlerfactory 工厂,还拥有 3 个不同的实现,下表分别说明它们的应用场景。

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

实现类型 作用
ioceventhandlerfactory 每个工厂对应一个事件处理器的的类型,并通过 scopefactory 解析具体的事件处理器。生命周期由 scope 控制,当 scope 释放时,对应的事件处理器实例也会被销毁。
singleinstancehandlerfactory 每个工厂对应单独的一个事件处理器实例,事件处理器实例是由创建者控制的。
transienteventhandlerfactory 每个工厂对应一个事件处理器的类型,区别是它不由 ioc 解析实例,而是使用的 activator.createinstance() 方法构造实例,是一个瞬时对象,调用包装器的 dispose 即会进行释放。
transienteventhandlerfactory<thandler> 每个工厂对应指定的 thandler 事件处理器,生命周期同上面的工厂一样。

这几种工厂都是在订阅操作时,不同的订阅重载使用不同的工厂,或者是自己指定事件处理器的工厂均可。

public virtual idisposable subscribe<tevent, thandler>()
    where tevent : class
    where thandler : ieventhandler, new()
{
    return subscribe(typeof(tevent), new transienteventhandlerfactory<thandler>());
}

public virtual idisposable subscribe(type eventtype, ieventhandler handler)
{
    return subscribe(eventtype, new singleinstancehandlerfactory(handler));
}

不过有一种特殊的行为,开发人员可以 不用显式订阅。在 eventbus 类型中,定义了一个 subscribehandlers(itypelist<ieventhandler> handlers) 方法。该方法接收一个类型集合,通过遍历集合,从事件处理器的定义当中,取得事件处理器监听的事件类型 tevent

在取得了事件类型,并知晓了事件处理器类型以后,事件总线就可以订阅 tevent 类型的事件,并使用 ioceventhandlerfactory 工厂来构造事件处理器。

protected virtual void subscribehandlers(itypelist<ieventhandler> handlers)
{
    // 遍历事件处理器的类型,其实这里的就是模块启动时,传递给 xxxoptions 的集合。
    foreach (var handler in handlers)
    {
        // 获得事件处理器的所有接口定义,并遍历接口进行检查。
        var interfaces = handler.getinterfaces();
        foreach (var @interface in interfaces)
        {
            // 如果接口没有实现 ieventhandler 类型,则忽略。
            if (!typeof(ieventhandler).gettypeinfo().isassignablefrom(@interface))
            {
                continue;
            }

            // 从泛型参数当中,获得定义的事件类型。
            var genericargs = @interface.getgenericarguments();
            // 泛型参数完全匹配 1 时,才进行订阅操作。
            if (genericargs.length == 1)
            {
                subscribe(genericargs[0], new ioceventhandlerfactory(servicescopefactory, handler));
            }
        }
    }
}

这个订阅方法在 eventbus 当中是一个抽象方法,分别在本地事件总线和分布式事件总线有实现,这里我们首先讲解本地事件的逻辑。

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

public class localeventbus : eventbusbase, ilocaleventbus, isingletondependency
{
    protected concurrentdictionary<type, list<ieventhandlerfactory>> handlerfactories { get; }

    public localeventbus(
        ioptions<abplocaleventbusoptions> options,
        iservicescopefactory servicescopefactory)
        : base(servicescopefactory)
    {
        options = options.value;
        logger = nulllogger<localeventbus>.instance;

        handlerfactories = new concurrentdictionary<type, list<ieventhandlerfactory>>();

        // 调用父类的方法,将模块初始化时扫描到的事件处理器,都尝试进行订阅。
        subscribehandlers(options.handlers);
    }

    public override idisposable subscribe(type eventtype, ieventhandlerfactory factory)
    {
        getorcreatehandlerfactories(eventtype)
            // 锁住集合,以确保线程安全。
            .locking(factories =>
                {
                    // 如果在集合内部,已经有了对应的工厂,则不进行添加。
                    if (!factory.isinfactories(factories))
                    {
                        factories.add(factory);
                    }
                }
            );

        // 返回一个事件处理器工厂注销器,当调用 dispose() 方法时,会取消之前订阅的事件。
        return new eventhandlerfactoryunregistrar(this, eventtype, factory);
    }

    private list<ieventhandlerfactory> getorcreatehandlerfactories(type eventtype)
    {
        // 根据事件的类型,从字典中获得该类型的所有事件处理器工厂。
        return handlerfactories.getoradd(eventtype, (type) => new list<ieventhandlerfactory>());
    }
}

上述流程结合 eventbuslocaleventbus 讲解了事件的订阅流程,事件的订阅操作都是对 handlerfactories 的操作,往里面添加指定事件的事件处理器工厂,而每个工厂都是跟具体的事件处理器实例/类型进行关联的。

2.3.2 事件的发布

当开发人员需要发布事件的时候,一般都是通过对应的 eventbus,调用响应的 publishasync 方法,传递要触发的事件类型与事件数据。接口和基类当中,定义了两种发布方法的签名与实现:

public virtual task publishasync<tevent>(tevent eventdata) where tevent : class
{
    return publishasync(typeof(tevent), eventdata);
}

public abstract task publishasync(type eventtype, object eventdata);

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

第二种方法一共也分为本地事件总线的实现,和分布式事件总线的实现,本地事件比较简单,我们先分析本地事件总线的实现。

public override async task publishasync(type eventtype, object eventdata)
{
    // 定义了一个异常集合,用于接收多个事件处理器执行时,产生的所有异常。
    var exceptions = new list<exception>();

    // 触发事件处理器。
    await triggerhandlersasync(eventtype, eventdata, exceptions);

    // 如果有任何异常产生,则抛出到之前的调用栈。
    if (exceptions.any())
    {
        if (exceptions.count == 1)
        {
            exceptions[0].rethrow();
        }

        throw new aggregateexception("more than one error has occurred while triggering the event: " + eventtype, exceptions);
    }
}

可以看到真正的触发行为是在 triggerhandlersasync(type eventtype, object eventdata, list<exception> exceptions) 内部进行实现的。

protected virtual async task triggerhandlersasync(type eventtype, object eventdata, list<exception> exceptions)
{
    // 针对于这个的作用,等同于 configureawait(false) 。
    // 具体可以参考 https://blogs.msdn.microsoft.com/benwilli/2017/02/09/an-alternative-to-configureawaitfalse-everywhere/。
    await new synchronizationcontextremover();

    // 根据事件的类型,得到它的所有事件处理器工厂。
    foreach (var handlerfactories in gethandlerfactories(eventtype))
    {
        // 遍历所有的事件处理器工厂,通过 factory 获得事件处理器,调用 handler 的 handleeventasync 方法。
        foreach (var handlerfactory in handlerfactories.eventhandlerfactories)
        {
            await triggerhandlerasync(handlerfactory, handlerfactories.eventtype, eventdata, exceptions);
        }
    }

    // 如果类型继承了 ieventdatawithinheritablegenericargument 接口,那么会检测泛型参数是否有父类。
    // 如果有父类,则会使用当前的事件数据,为其父类发布一个事件。
    if (eventtype.gettypeinfo().isgenerictype &&
        eventtype.getgenericarguments().length == 1 &&
        typeof(ieventdatawithinheritablegenericargument).isassignablefrom(eventtype))
    {
        var genericarg = eventtype.getgenericarguments()[0];
        var basearg = genericarg.gettypeinfo().basetype;
        if (basearg != null)
        {
            // 构造基类的事件类型,使用当前一样的泛型定义,只是泛型参数使用基类。
            var baseeventtype = eventtype.getgenerictypedefinition().makegenerictype(basearg);
            // 构建类型的构造参数。
            var constructorargs = ((ieventdatawithinheritablegenericargument)eventdata).getconstructorargs();
            // 通过事件类型和构造参数,构造一个新的事件数据实例。
            var baseeventdata = activator.createinstance(baseeventtype, constructorargs);
            // 发布父类的同类事件。
            await publishasync(baseeventtype, baseeventdata);
        }
    }
}

在上述代码内部,都还没有真正执行事件处理器,真正的事件处理器执行程序是在下面的方法进行执行的。abp vnext 通过引入 ieventdatawithinheritablegenericargument 接口,实现了 类型继承事件 的触发,该接口提供了一个 getconstructorargs() 方法定义,方便后面生成构造参数。

例如有一个基础事件叫做 entityeventdata<student>,如果 student 继承自 person,那么在触发该事件的时候,也会发布一个 entityeventdata<person> 事件。

2.3.3 事件处理器的执行

真正事件处理器的执行,是通过下面的方法实现的,大概思路就是通过事件总线工厂,构建了事件处理器的实例。通过反射,调用事件处理器的 handleeventasync() 方法。如果在处理过程当中,出现了异常,则将异常数据放置在 list<exception> 集合当中。

protected virtual async task triggerhandlerasync(ieventhandlerfactory asynchandlerfactory, type eventtype, object eventdata, list<exception> exceptions)
{
    using (var eventhandlerwrapper = asynchandlerfactory.gethandler())
    {
        try
        {
            // 获得事件处理器的类型。
            var handlertype = eventhandlerwrapper.eventhandler.gettype();

            // 判断事件处理器是本地事件还是分布式事件。
            if (reflectionhelper.isassignabletogenerictype(handlertype, typeof(ilocaleventhandler<>)))
            {
                // 获得方法定义。
                var method = typeof(ilocaleventhandler<>)
                    .makegenerictype(eventtype)
                    .getmethod(
                        nameof(ilocaleventhandler<object>.handleeventasync),
                        new[] { eventtype }
                    );

                // 使用工厂创建的实例调用方法。
                await (task)method.invoke(eventhandlerwrapper.eventhandler, new[] { eventdata });
            }
            else if (reflectionhelper.isassignabletogenerictype(handlertype, typeof(idistributedeventhandler<>)))
            {
                var method = typeof(idistributedeventhandler<>)
                    .makegenerictype(eventtype)
                    .getmethod(
                        nameof(idistributedeventhandler<object>.handleeventasync),
                        new[] { eventtype }
                    );

                await (task)method.invoke(eventhandlerwrapper.eventhandler, new[] { eventdata });
            }
            else
            {
                // 如果都不是,则说明类型不正确,抛出异常。
                throw new abpexception("the object instance is not an event handler. object type: " + handlertype.assemblyqualifiedname);
            }
        }
        // 捕获到异常都统一添加到异常集合当中。
        catch (targetinvocationexception ex)
        {
            exceptions.add(ex.innerexception);
        }
        catch (exception ex)
        {
            exceptions.add(ex);
        }
    }
}

2.4 分布式事件总线

分布式事件总线的实现都存放在 volo.abp.eventbus.rabbitmq,该项目的代码比较少,由三个文件构成。

[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)

在 rabbitmq 模块的内部,只干了两件事情。首先从 json 配置文件当中,获取 abprabbitmqeventbusoptions 配置的三个参数,然后解析 rabbitmqdistributedeventbus 实例,并调用初始化方法 (initialize())。

[dependson(
    typeof(abpeventbusmodule),
    typeof(abprabbitmqmodule))]
public class abpeventbusrabbitmqmodule : abpmodule
{
    public override void configureservices(serviceconfigurationcontext context)
    {
        var configuration = context.services.getconfiguration();

        // 从配置文件读取配置。
        configure<abprabbitmqeventbusoptions>(configuration.getsection("rabbitmq:eventbus"));
    }

    public override void onapplicationinitialization(applicationinitializationcontext context)
    {
        // 调用初始化方法。
        context
            .serviceprovider
            .getrequiredservice<rabbitmqdistributedeventbus>()
            .initialize();
    }
}

2.4.1 分布式事件总线的初始化

public void initialize()
{
    // 创建一个消费者,并配置交换器和队列。
    consumer = messageconsumerfactory.create(
        new exchangedeclareconfiguration(
            abprabbitmqeventbusoptions.exchangename,
            type: "direct",
            durable: true
        ),
        new queuedeclareconfiguration(
            abprabbitmqeventbusoptions.clientname,
            durable: true,
            exclusive: false,
            autodelete: false
        ),
        abprabbitmqeventbusoptions.connectionname
    );

    // 消费者在消费消息的时候,具体的执行逻辑。
    consumer.onmessagereceived(processeventasync);

    // 调用基类的方法,自动订阅对应的事件。
    subscribehandlers(abpdistributedeventbusoptions.handlers);
}

2.4.2 分布式事件的订阅

在定义分布式事件的时候,我们必须使用 eventnameattribute 为事件声明路由键。

public override idisposable subscribe(type eventtype, ieventhandlerfactory factory)
{
    var handlerfactories = getorcreatehandlerfactories(eventtype);

    if (factory.isinfactories(handlerfactories))
    {
        return nulldisposable.instance;
    }

    handlerfactories.add(factory);

    if (handlerfactories.count == 1) //todo: multi-threading!
    {
        // 为消费者绑定一个路由键,在收到对应的事件时,就会触发之前绑定的方法。
        consumer.bindasync(eventnameattribute.getnameordefault(eventtype));
    }

    return new eventhandlerfactoryunregistrar(this, eventtype, factory);
}

订阅的时候,除了 consumer.bindasync() 以外,基本流程和本地事件总线基本一致。

2.4.3 分布式事件的发布

分布式事件总线一样重写了发布方法,内部首先使用 irabbitmqserializer 序列化器 (基于 json.net) 将事件数据进行序列化,然后将消息投递出去。

public override task publishasync(type eventtype, object eventdata)
{
    var eventname = eventnameattribute.getnameordefault(eventtype);
    // 序列化事件数据。
    var body = serializer.serialize(eventdata);

    // 创建一个信道用于通讯。
    using (var channel = connectionpool.get(abprabbitmqeventbusoptions.connectionname).createmodel())
    {
        channel.exchangedeclare(
            abprabbitmqeventbusoptions.exchangename,
            "direct",
            durable: true
        );
        
        // 更改投递模式为持久化模式。
        var properties = channel.createbasicproperties();
        properties.deliverymode = rabbitmqconsts.deliverymodes.persistent;

        // 发布一个新的事件。
        channel.basicpublish(
            exchange: abprabbitmqeventbusoptions.exchangename,
            routingkey: eventname,
            mandatory: true,
            basicproperties: properties,
            body: body
        );
    }

    return task.completedtask;
}

2.4.4 分布式事件的执行

执行逻辑都存放在 processeventasync(imodel channel, basicdelivereventargs ea) 方法内部,基本就是监听到指定的消息,首先反序列化消息,调用父类的 triggerhandlersasync 去执行具体的事件处理器。

private async task processeventasync(imodel channel, basicdelivereventargs ea)
{
    var eventname = ea.routingkey;
    var eventtype = eventtypes.getordefault(eventname);
    if (eventtype == null)
    {
        return;
    }

    var eventdata = serializer.deserialize(ea.body, eventtype);

    await triggerhandlersasync(eventtype, eventdata);
}

三、总结

abp vnext 为我们实现了比较完善的本地事件总线,和基于 rabbitmq 的分布式事件总线。在平时开发过程中,我们本地事件总线的使用频率应该还是比较高,而分布式事件总线目前仍处于一个半成品,很多高级特性还没实现,例如重试策略等。所以分布式事件总线要使用的话,建议使用较为成熟的 cap 库替代 abp vnext 的分布式事件总线。

四、其他

360 大病救助 : 在这里向大家求助一下,病人是我亲戚,情况属实。对于他们家庭来说,经济压力很大,希望大家能帮助或转发一下,谢谢大家。