[Abp vNext 源码分析] - 13. 本地事件总线与分布式事件总线 (Rabbit MQ)
一、简要介绍
abp vnext 封装了两种事件总线结构,第一种是 abp vnext 自己实现的本地事件总线,这种事件总线无法跨项目发布和订阅。第二种则是分布式事件总线,abp vnext 自己封装了一个抽象层进行定义,并使用 rabbitmq 编写了一个基本实现。
在使用方式上,两种事件总线的作用基本相同。
事件总线分布在两个模块,在 volo.abp.eventbus 模块内部,定义了事件总线的抽象接口,以及本地事件总线 (ilocaleventbus
) 的实现。分布式事件总线的具体实现,是在 volo.abp.eventbus.rabbitmq 模块内部进行定义,从项目名称可以看出来,这个模块是基于 rabbitmq 消息队列实现的。
但是该项目并不是直接引用 rabbitmq.client 包,而是在 volo.abp.rabbitmq 项目内部引用。这是因为除了分布式事件总线以外,abp 还基于 rabbitmq 实现了一个后台作业管理器。
abp vnext 框架便将一些对象抽象出来,放在 volo.abp.rabbitmq 项目内部进行定义和实现。
二、源码分析
2.1 事件处理器的注册
分析源码,首先从一个项目的模块开始,volo.abp.eventbus 库的模块 abpeventbusmodule
只干了一件事情。在组件注册的时候,根据组件的实现接口 (ilocaleventhandler
或 idistributedeventhandler
) 不同,将其赋值给 abplocaleventbusoptions
与 abpdistributedeventbusoptions
的 handlers
属性。
也就是说,开发人员定义的事件处理程序 (handler
) 都会在依赖注入的时候,都会将其类型 (type
) 添加到事件总线的配置类当中,方便后续进行使用。
2.2 事件总线的接口
通过事件总线模块的单元测试我们可以知道,事件的发布与订阅都是通过 ieventbus
的两个子接口 (ilocaleventbus
/idistributedeventbus
) 进行的。在 ieventbus
接口的定义中,有三种行为,分别是 发布、订阅、取消订阅。
对于 ilocaleventbus
接口和 idistributedeventbus
接口来说,它们都提供了一个,针对本地事件处理器和分布式处理器的特殊订阅方法。
ilocaleventbus
:
/// <summary> /// defines interface of the event bus. /// </summary> public interface ilocaleventbus : ieventbus { /// <summary> /// registers to an event. /// same (given) instance of the handler is used for all event occurrences. /// </summary> /// <typeparam name="tevent">event type</typeparam> /// <param name="handler">object to handle the event</param> idisposable subscribe<tevent>(ilocaleventhandler<tevent> handler) where tevent : class; }
idistributedeventbus
:
public interface idistributedeventbus : ieventbus { /// <summary> /// registers to an event. /// same (given) instance of the handler is used for all event occurrences. /// </summary> /// <typeparam name="tevent">event type</typeparam> /// <param name="handler">object to handle the event</param> idisposable subscribe<tevent>(idistributedeventhandler<tevent> handler) where tevent : class; }
2.3 事件总线基本流程和实现
同其他模块一样,因为有分布式事件总线和本地事件总线,abp vnext 同样抽象了一个 eventbusbase
类型,作为它们的基类实现。
一般的流程,我们是先定义某个事件,然后订阅该事件并指定事件处理器,最后在某个时刻发布事件。例如下面的代码:
首先定义了一个事件处理器,专门用于处理 entitychangedeventdata<myentity>
事件。
public class myeventhandler : ilocaleventhandler<entitychangedeventdata<myentity>> { public int entitychangedeventcount { get; set; } public task handleeventasync(entitychangedeventdata<myentity> eventdata) { entitychangedeventcount++; return task.completedtask; } }
var handler = new myeventhandler(); localeventbus.subscribe<entitychangedeventdata<myentity>>(handler); await localeventbus.publishasync(new entitycreatedeventdata<myentity>(new myentity()));
2.3.1 事件的订阅
可以看到,这里使用的是 ilocaleventbus
定义的订阅方法,跳转到内部实现,它还是调用的 eventbus
的方法。
public virtual idisposable subscribe<tevent>(ilocaleventhandler<tevent> handler) where tevent : class { // 调用基类的 subscribe 方法,并传递 tevent 的类型,和事件处理器。 return subscribe(typeof(tevent), handler); }
public virtual idisposable subscribe(type eventtype, ieventhandler handler) { return subscribe(eventtype, new singleinstancehandlerfactory(handler)); }
可以看到,这里传递了一个 singleinstancehandlerfactory
对象,这玩意儿是干嘛用的呢?从名字可以看出来,这是一个工厂,是用来创建 handler (事件处理器) 的工厂,并且是一个单实例的事件处理器工厂。
下面就是 ieventhandlerfactory
接口的定义,以及 singleinstancehandlerfactory
实现。
public interface ieventhandlerfactory { // 获得一个事件处理器包装对象,即事件处理器执行完毕之后,可以调用 // ieventhandlerdisposewrapper.dispose() 进行释放。 ieventhandlerdisposewrapper gethandler(); // 判断在已有的事件处理器工厂集合中,是否已经存在了相同的事件处理器。 bool isinfactories(list<ieventhandlerfactory> handlerfactories); } public class singleinstancehandlerfactory : ieventhandlerfactory { // 构造工厂时,传递的事件处理器实例。 public ieventhandler handlerinstance { get; } public singleinstancehandlerfactory(ieventhandler handler) { handlerinstance = handler; } // 通过 eventhandlerdisposewrapper 包装事件处理器实例。 public ieventhandlerdisposewrapper gethandler() { return new eventhandlerdisposewrapper(handlerinstance); } // 判断针对 handlerinstance 的事件处理器是否已经存在。 public bool isinfactories(list<ieventhandlerfactory> handlerfactories) { return handlerfactories .oftype<singleinstancehandlerfactory>() .any(f => f.handlerinstance == handlerinstance); } }
针对 ieventhandlerfactory
工厂,还拥有 3 个不同的实现,下表分别说明它们的应用场景。
实现类型 | 作用 |
---|---|
ioceventhandlerfactory |
每个工厂对应一个事件处理器的的类型,并通过 scopefactory 解析具体的事件处理器。生命周期由 scope 控制,当 scope 释放时,对应的事件处理器实例也会被销毁。 |
singleinstancehandlerfactory |
每个工厂对应单独的一个事件处理器实例,事件处理器实例是由创建者控制的。 |
transienteventhandlerfactory |
每个工厂对应一个事件处理器的类型,区别是它不由 ioc 解析实例,而是使用的 activator.createinstance() 方法构造实例,是一个瞬时对象,调用包装器的 dispose 即会进行释放。 |
transienteventhandlerfactory<thandler> |
每个工厂对应指定的 thandler 事件处理器,生命周期同上面的工厂一样。 |
这几种工厂都是在订阅操作时,不同的订阅重载使用不同的工厂,或者是自己指定事件处理器的工厂均可。
public virtual idisposable subscribe<tevent, thandler>() where tevent : class where thandler : ieventhandler, new() { return subscribe(typeof(tevent), new transienteventhandlerfactory<thandler>()); } public virtual idisposable subscribe(type eventtype, ieventhandler handler) { return subscribe(eventtype, new singleinstancehandlerfactory(handler)); }
不过有一种特殊的行为,开发人员可以 不用显式订阅。在 eventbus
类型中,定义了一个 subscribehandlers(itypelist<ieventhandler> handlers)
方法。该方法接收一个类型集合,通过遍历集合,从事件处理器的定义当中,取得事件处理器监听的事件类型 tevent
。
在取得了事件类型,并知晓了事件处理器类型以后,事件总线就可以订阅 tevent
类型的事件,并使用 ioceventhandlerfactory
工厂来构造事件处理器。
protected virtual void subscribehandlers(itypelist<ieventhandler> handlers) { // 遍历事件处理器的类型,其实这里的就是模块启动时,传递给 xxxoptions 的集合。 foreach (var handler in handlers) { // 获得事件处理器的所有接口定义,并遍历接口进行检查。 var interfaces = handler.getinterfaces(); foreach (var @interface in interfaces) { // 如果接口没有实现 ieventhandler 类型,则忽略。 if (!typeof(ieventhandler).gettypeinfo().isassignablefrom(@interface)) { continue; } // 从泛型参数当中,获得定义的事件类型。 var genericargs = @interface.getgenericarguments(); // 泛型参数完全匹配 1 时,才进行订阅操作。 if (genericargs.length == 1) { subscribe(genericargs[0], new ioceventhandlerfactory(servicescopefactory, handler)); } } } }
这个订阅方法在 eventbus
当中是一个抽象方法,分别在本地事件总线和分布式事件总线有实现,这里我们首先讲解本地事件的逻辑。
public class localeventbus : eventbusbase, ilocaleventbus, isingletondependency { protected concurrentdictionary<type, list<ieventhandlerfactory>> handlerfactories { get; } public localeventbus( ioptions<abplocaleventbusoptions> options, iservicescopefactory servicescopefactory) : base(servicescopefactory) { options = options.value; logger = nulllogger<localeventbus>.instance; handlerfactories = new concurrentdictionary<type, list<ieventhandlerfactory>>(); // 调用父类的方法,将模块初始化时扫描到的事件处理器,都尝试进行订阅。 subscribehandlers(options.handlers); } public override idisposable subscribe(type eventtype, ieventhandlerfactory factory) { getorcreatehandlerfactories(eventtype) // 锁住集合,以确保线程安全。 .locking(factories => { // 如果在集合内部,已经有了对应的工厂,则不进行添加。 if (!factory.isinfactories(factories)) { factories.add(factory); } } ); // 返回一个事件处理器工厂注销器,当调用 dispose() 方法时,会取消之前订阅的事件。 return new eventhandlerfactoryunregistrar(this, eventtype, factory); } private list<ieventhandlerfactory> getorcreatehandlerfactories(type eventtype) { // 根据事件的类型,从字典中获得该类型的所有事件处理器工厂。 return handlerfactories.getoradd(eventtype, (type) => new list<ieventhandlerfactory>()); } }
上述流程结合 eventbus
和 localeventbus
讲解了事件的订阅流程,事件的订阅操作都是对 handlerfactories
的操作,往里面添加指定事件的事件处理器工厂,而每个工厂都是跟具体的事件处理器实例/类型进行关联的。
2.3.2 事件的发布
当开发人员需要发布事件的时候,一般都是通过对应的 eventbus
,调用响应的 publishasync
方法,传递要触发的事件类型与事件数据。接口和基类当中,定义了两种发布方法的签名与实现:
public virtual task publishasync<tevent>(tevent eventdata) where tevent : class { return publishasync(typeof(tevent), eventdata); } public abstract task publishasync(type eventtype, object eventdata);
第二种方法一共也分为本地事件总线的实现,和分布式事件总线的实现,本地事件比较简单,我们先分析本地事件总线的实现。
public override async task publishasync(type eventtype, object eventdata) { // 定义了一个异常集合,用于接收多个事件处理器执行时,产生的所有异常。 var exceptions = new list<exception>(); // 触发事件处理器。 await triggerhandlersasync(eventtype, eventdata, exceptions); // 如果有任何异常产生,则抛出到之前的调用栈。 if (exceptions.any()) { if (exceptions.count == 1) { exceptions[0].rethrow(); } throw new aggregateexception("more than one error has occurred while triggering the event: " + eventtype, exceptions); } }
可以看到真正的触发行为是在 triggerhandlersasync(type eventtype, object eventdata, list<exception> exceptions)
内部进行实现的。
protected virtual async task triggerhandlersasync(type eventtype, object eventdata, list<exception> exceptions) { // 针对于这个的作用,等同于 configureawait(false) 。 // 具体可以参考 https://blogs.msdn.microsoft.com/benwilli/2017/02/09/an-alternative-to-configureawaitfalse-everywhere/。 await new synchronizationcontextremover(); // 根据事件的类型,得到它的所有事件处理器工厂。 foreach (var handlerfactories in gethandlerfactories(eventtype)) { // 遍历所有的事件处理器工厂,通过 factory 获得事件处理器,调用 handler 的 handleeventasync 方法。 foreach (var handlerfactory in handlerfactories.eventhandlerfactories) { await triggerhandlerasync(handlerfactory, handlerfactories.eventtype, eventdata, exceptions); } } // 如果类型继承了 ieventdatawithinheritablegenericargument 接口,那么会检测泛型参数是否有父类。 // 如果有父类,则会使用当前的事件数据,为其父类发布一个事件。 if (eventtype.gettypeinfo().isgenerictype && eventtype.getgenericarguments().length == 1 && typeof(ieventdatawithinheritablegenericargument).isassignablefrom(eventtype)) { var genericarg = eventtype.getgenericarguments()[0]; var basearg = genericarg.gettypeinfo().basetype; if (basearg != null) { // 构造基类的事件类型,使用当前一样的泛型定义,只是泛型参数使用基类。 var baseeventtype = eventtype.getgenerictypedefinition().makegenerictype(basearg); // 构建类型的构造参数。 var constructorargs = ((ieventdatawithinheritablegenericargument)eventdata).getconstructorargs(); // 通过事件类型和构造参数,构造一个新的事件数据实例。 var baseeventdata = activator.createinstance(baseeventtype, constructorargs); // 发布父类的同类事件。 await publishasync(baseeventtype, baseeventdata); } } }
在上述代码内部,都还没有真正执行事件处理器,真正的事件处理器执行程序是在下面的方法进行执行的。abp vnext 通过引入 ieventdatawithinheritablegenericargument
接口,实现了 类型继承事件 的触发,该接口提供了一个 getconstructorargs()
方法定义,方便后面生成构造参数。
例如有一个基础事件叫做 entityeventdata<student>
,如果 student
继承自 person
,那么在触发该事件的时候,也会发布一个 entityeventdata<person>
事件。
2.3.3 事件处理器的执行
真正事件处理器的执行,是通过下面的方法实现的,大概思路就是通过事件总线工厂,构建了事件处理器的实例。通过反射,调用事件处理器的 handleeventasync()
方法。如果在处理过程当中,出现了异常,则将异常数据放置在 list<exception>
集合当中。
protected virtual async task triggerhandlerasync(ieventhandlerfactory asynchandlerfactory, type eventtype, object eventdata, list<exception> exceptions) { using (var eventhandlerwrapper = asynchandlerfactory.gethandler()) { try { // 获得事件处理器的类型。 var handlertype = eventhandlerwrapper.eventhandler.gettype(); // 判断事件处理器是本地事件还是分布式事件。 if (reflectionhelper.isassignabletogenerictype(handlertype, typeof(ilocaleventhandler<>))) { // 获得方法定义。 var method = typeof(ilocaleventhandler<>) .makegenerictype(eventtype) .getmethod( nameof(ilocaleventhandler<object>.handleeventasync), new[] { eventtype } ); // 使用工厂创建的实例调用方法。 await (task)method.invoke(eventhandlerwrapper.eventhandler, new[] { eventdata }); } else if (reflectionhelper.isassignabletogenerictype(handlertype, typeof(idistributedeventhandler<>))) { var method = typeof(idistributedeventhandler<>) .makegenerictype(eventtype) .getmethod( nameof(idistributedeventhandler<object>.handleeventasync), new[] { eventtype } ); await (task)method.invoke(eventhandlerwrapper.eventhandler, new[] { eventdata }); } else { // 如果都不是,则说明类型不正确,抛出异常。 throw new abpexception("the object instance is not an event handler. object type: " + handlertype.assemblyqualifiedname); } } // 捕获到异常都统一添加到异常集合当中。 catch (targetinvocationexception ex) { exceptions.add(ex.innerexception); } catch (exception ex) { exceptions.add(ex); } } }
2.4 分布式事件总线
分布式事件总线的实现都存放在 volo.abp.eventbus.rabbitmq,该项目的代码比较少,由三个文件构成。
在 rabbitmq 模块的内部,只干了两件事情。首先从 json 配置文件当中,获取 abprabbitmqeventbusoptions
配置的三个参数,然后解析 rabbitmqdistributedeventbus
实例,并调用初始化方法 (initialize()
)。
[dependson( typeof(abpeventbusmodule), typeof(abprabbitmqmodule))] public class abpeventbusrabbitmqmodule : abpmodule { public override void configureservices(serviceconfigurationcontext context) { var configuration = context.services.getconfiguration(); // 从配置文件读取配置。 configure<abprabbitmqeventbusoptions>(configuration.getsection("rabbitmq:eventbus")); } public override void onapplicationinitialization(applicationinitializationcontext context) { // 调用初始化方法。 context .serviceprovider .getrequiredservice<rabbitmqdistributedeventbus>() .initialize(); } }
2.4.1 分布式事件总线的初始化
public void initialize() { // 创建一个消费者,并配置交换器和队列。 consumer = messageconsumerfactory.create( new exchangedeclareconfiguration( abprabbitmqeventbusoptions.exchangename, type: "direct", durable: true ), new queuedeclareconfiguration( abprabbitmqeventbusoptions.clientname, durable: true, exclusive: false, autodelete: false ), abprabbitmqeventbusoptions.connectionname ); // 消费者在消费消息的时候,具体的执行逻辑。 consumer.onmessagereceived(processeventasync); // 调用基类的方法,自动订阅对应的事件。 subscribehandlers(abpdistributedeventbusoptions.handlers); }
2.4.2 分布式事件的订阅
在定义分布式事件的时候,我们必须使用 eventnameattribute
为事件声明路由键。
public override idisposable subscribe(type eventtype, ieventhandlerfactory factory) { var handlerfactories = getorcreatehandlerfactories(eventtype); if (factory.isinfactories(handlerfactories)) { return nulldisposable.instance; } handlerfactories.add(factory); if (handlerfactories.count == 1) //todo: multi-threading! { // 为消费者绑定一个路由键,在收到对应的事件时,就会触发之前绑定的方法。 consumer.bindasync(eventnameattribute.getnameordefault(eventtype)); } return new eventhandlerfactoryunregistrar(this, eventtype, factory); }
订阅的时候,除了 consumer.bindasync()
以外,基本流程和本地事件总线基本一致。
2.4.3 分布式事件的发布
分布式事件总线一样重写了发布方法,内部首先使用 irabbitmqserializer
序列化器 (基于 json.net) 将事件数据进行序列化,然后将消息投递出去。
public override task publishasync(type eventtype, object eventdata) { var eventname = eventnameattribute.getnameordefault(eventtype); // 序列化事件数据。 var body = serializer.serialize(eventdata); // 创建一个信道用于通讯。 using (var channel = connectionpool.get(abprabbitmqeventbusoptions.connectionname).createmodel()) { channel.exchangedeclare( abprabbitmqeventbusoptions.exchangename, "direct", durable: true ); // 更改投递模式为持久化模式。 var properties = channel.createbasicproperties(); properties.deliverymode = rabbitmqconsts.deliverymodes.persistent; // 发布一个新的事件。 channel.basicpublish( exchange: abprabbitmqeventbusoptions.exchangename, routingkey: eventname, mandatory: true, basicproperties: properties, body: body ); } return task.completedtask; }
2.4.4 分布式事件的执行
执行逻辑都存放在 processeventasync(imodel channel, basicdelivereventargs ea)
方法内部,基本就是监听到指定的消息,首先反序列化消息,调用父类的 triggerhandlersasync
去执行具体的事件处理器。
private async task processeventasync(imodel channel, basicdelivereventargs ea) { var eventname = ea.routingkey; var eventtype = eventtypes.getordefault(eventname); if (eventtype == null) { return; } var eventdata = serializer.deserialize(ea.body, eventtype); await triggerhandlersasync(eventtype, eventdata); }
三、总结
abp vnext 为我们实现了比较完善的本地事件总线,和基于 rabbitmq 的分布式事件总线。在平时开发过程中,我们本地事件总线的使用频率应该还是比较高,而分布式事件总线目前仍处于一个半成品,很多高级特性还没实现,例如重试策略等。所以分布式事件总线要使用的话,建议使用较为成熟的 cap 库替代 abp vnext 的分布式事件总线。
四、其他
360 大病救助 : 在这里向大家求助一下,病人是我亲戚,情况属实。对于他们家庭来说,经济压力很大,希望大家能帮助或转发一下,谢谢大家。
上一篇: 这是亲妈
下一篇: Glide源码解析一,初始化