[Abp vNext 源码分析] - 12. 后台作业与后台工作者
一、简要说明
文章信息:
基于的 abp vnext 版本:1.0.0
创作日期:2019 年 10 月 24 日晚
更新日期:暂无
abp vnext 提供了后台工作者和后台作业的支持,基本实现与原来的 abp 框架类似,并且 abp vnext 还提供了对 hangfire 和 rabbitmq 的后台作业集成。开发人员在使用这些第三方库的时候,基本就是开箱即用,不需要做其他复杂的配置。
后台作业在系统开发的过程当中,是比较常用的功能。因为总是有一些长耗时的任务,而这些任务我们不是立即响应的,例如 excel 文档导入、批量发送短信通知等。
后台工作者 的话,abp vnext 的实现就是在 clr 的 timer
之上封装了一层,周期性地执行用户逻辑。abp vnext 默认提供的 后台任务管理器,就是在后台工作者基础之上进行的封装。
涉及到后台任务、后台工作者的模块一共有 6 个,它们分别是:
-
volo.abp.threading :提供了一些常用的线程组件,其中
abptimer
就是在里面实现的。 volo.abp.backgroundworkers :后台工作者的定义和实现。
- volo.abp.backgroundjobs.abstractions :后台任务的一些共有定义。
- volo.abp.backgroundjobs :默认的后台任务管理器实现。
- volo.abp.backgroundjobs.hangfire :基于 hangfire 库实现的后台任务管理器。
volo.abp.backgroundjobs.rabbitmq : 基于 rabbitmq 实现的后台任务管理器。
二、源码分析
线程组件
健壮的计时器
clr 为我们提供了多种计时器,我们一般使用的是 system.threading.timer
,它是基于 clr 线程池的一个周期计时器,会根据我们配置的 period
(周期) 定时执行。在 clr 线程池中,所有的 timer
只有 1 个线程为其服务。这个线程直到下一个计时器的触发时间,当下一个 timer
对象到期时,这个线程就会将 timer
的回调方法通过 threadpool.queueuserworkitem()
扔到线程池去执行。
不过这带来了一个问题,即你的回调方法执行时间超过了计时器的周期,那么就会造成上一个任务还没执行完成又开始执行新的任务。
解决这个方法其实很简单,即启动之后,将周期设置为 timeout.infinite
,这样只会执行一次。当回调方法执行完成之后,就设置 duetime
参数说明下次执行要等待多久,并且周期还是 timeout.infinite
。
abp vnext 已经为我们提供了健壮的计时器,该类型的定义是 abptimer
,在内部用到了 volatile
关键字和 monitor
实现 条件变量模式 解决多线程环境下的问题。
public class abptimer : itransientdependency { // 回调事件。 public event eventhandler elapsed; // 执行周期。 public int period { get; set; } // 定时器启动之后就开始运行,默认为 fasle。 public bool runonstart { get; set; } // 日志记录器。 public ilogger<abptimer> logger { get; set; } private readonly timer _tasktimer; // 定时器是否在执行任务,默认为 false。 private volatile bool _performingtasks; // 定时器的运行状态,默认为 false。 private volatile bool _isrunning; public abptimer() { logger = nulllogger<abptimer>.instance; // 回调函数是 timercallback,执行周期为永不执行。 _tasktimer = new timer(timercallback, null, timeout.infinite, timeout.infinite); } public void start(cancellationtoken cancellationtoken = default) { // 如果传递的周期小于等于 0 ,则抛出异常。 if (period <= 0) { throw new abpexception("period should be set before starting the timer!"); } // 使用互斥锁,保证线程安全。 lock (_tasktimer) { // 如果启动之后就需要马上执行,则设置为 0,马上执行任务,否则会等待 period 毫秒之后再执行(1 个周期)。 _tasktimer.change(runonstart ? 0 : period, timeout.infinite); // 定时器成功运行了。 _isrunning = true; } // 释放 _tasktimer 的互斥锁。 } public void stop(cancellationtoken cancellationtoken = default) { // 使用互斥锁。 lock (_tasktimer) { // 将内部定时器设置为永不执行的状态。 _tasktimer.change(timeout.infinite, timeout.infinite); // 检测当前是否还有正在执行的任务,如果有则等待任务执行完成。 while (_performingtasks) { // 临时释放锁,阻塞当前线程。但是其他线程可以获取 _timer 的互斥锁。 monitor.wait(_tasktimer); } // 需要表示停止状态,所以标记状态为 false。 _isrunning = false; } } private void timercallback(object state) { lock (_tasktimer) { // 如果有任务正在运行,或者内部定时器已经停止了,则不做任何事情。 if (!_isrunning || _performingtasks) { return; } // 临时停止内部定时器。 _tasktimer.change(timeout.infinite, timeout.infinite); // 表明马上需要执行任务了。 _performingtasks = true; } try { // 调用绑定的事件。 elapsed.invokesafely(this, new eventargs()); } catch { // 注意,这里将会吞噬异常。 } finally { lock (_tasktimer) { // 任务执行完成,更改状态。 _performingtasks = false; // 如果定时器还在运行,没有被停止,则启动下一个 period 周期。 if (_isrunning) { _tasktimer.change(period, timeout.infinite); } // 解除因为释放锁而阻塞的线程。 // 如果已经调用了 stop,则会唤醒那个因为 wait 阻塞的线程,就会使 _isrunning 置为 false。 monitor.pulse(_tasktimer); } } } }
这里对 _performingtasks
和 _isrunning
字段设置为 volatile
防止指令重排和寄存器缓存。这是因为在 stop
方法内部使用到的 _performingtasks
可能会被优化,所以将该字段设置为了易失的。
irunnable
接口
abp vnext 为任务的启动和停止,抽象了一个 irunnable
接口。虽然描述说的是对线程的行为进行抽象,但千万千万不要手动调用 thread.abort()
。关于 thread.abort()
的坏处,这里不再多加赘述,可以参考 的描述,或者搜索其他的相关文章。
public interface irunnable { // 启动这个服务。 task startasync(cancellationtoken cancellationtoken = default); /// <summary> /// 停止这个服务。 /// </summary> task stopasync(cancellationtoken cancellationtoken = default); }
后台工作者
模块的构造
后台工作者的模块行为比较简单,它定义了在应用程序初始化和销毁时的行为。在初始化时,后台工作者管理器 获得所有 后台工作者,并开始启动它们。在销毁时,后台工作者管理器获得所有后台工作者,并开始停止他们,这样才能够做到优雅退出。
[dependson( typeof(abpthreadingmodule) )] public class abpbackgroundworkersmodule : abpmodule { public override void onapplicationinitialization(applicationinitializationcontext context) { var options = context.serviceprovider.getrequiredservice<ioptions<abpbackgroundworkeroptions>>().value; // 如果启用了后台工作者,那么获得后台工作者管理器的实例,并调用 startasync 启动所有后台工作者。 if (options.isenabled) { asynchelper.runsync( () => context.serviceprovider .getrequiredservice<ibackgroundworkermanager>() .startasync() ); } } public override void onapplicationshutdown(applicationshutdowncontext context) { var options = context.serviceprovider.getrequiredservice<ioptions<abpbackgroundworkeroptions>>().value; // 如果启用了后台工作者,那么获得后台工作者管理器的实例,并调用 stopasync 停止所有后台工作者。 if (options.isenabled) { asynchelper.runsync( () => context.serviceprovider .getrequiredservice<ibackgroundworkermanager>() .stopasync() ); } } }
后台工作者的定义
首先看看 ibackgroundworker
接口的定义,是空的。不过继承了 isingletondependency
接口,说明我们的每个后台工作者都是 单例 的。
/// <summary> /// 在后台运行,执行某些任务的工作程序(线程)的接口定义。 /// </summary> public interface ibackgroundworker : irunnable, isingletondependency { }
abp vnext 为我们定义了一个抽象的后台工作者类型 backgroundworkerbase
,这个基类的设计目的是提供一些常用组件(和 applicationservice
一样)。
public abstract class backgroundworkerbase : ibackgroundworker { //todo: add uow, localization and other useful properties..? //todo: 是否应该提供工作单元、本地化以及其他常用的属性? public ilogger<backgroundworkerbase> logger { protected get; set; } protected backgroundworkerbase() { logger = nulllogger<backgroundworkerbase>.instance; } public virtual task startasync(cancellationtoken cancellationtoken = default) { logger.logdebug("started background worker: " + tostring()); return task.completedtask; } public virtual task stopasync(cancellationtoken cancellationtoken = default) { logger.logdebug("stopped background worker: " + tostring()); return task.completedtask; } public override string tostring() { return gettype().fullname; } }
abp vnext 内部只有一个默认的后台工作者实现 periodicbackgroundworkerbase
。从名字上来看,意思是就是周期执行的后台工作者,内部就是用的 abptimer
来实现,abp vnext 将其包装起来是为了实现统一的模式(后台工作者)。
public abstract class periodicbackgroundworkerbase : backgroundworkerbase { protected readonly abptimer timer; // 也就意味着子类必须在其构造函数,指定 timer 的执行周期。 protected periodicbackgroundworkerbase(abptimer timer) { timer = timer; timer.elapsed += timer_elapsed; } // 启动后台工作者。 public override async task startasync(cancellationtoken cancellationtoken = default) { await base.startasync(cancellationtoken); timer.start(cancellationtoken); } // 停止后台工作者。 public override async task stopasync(cancellationtoken cancellationtoken = default) { timer.stop(cancellationtoken); await base.stopasync(cancellationtoken); } // timer 关联的周期事件,之所以不直接挂载 dowork,是为了捕获异常。 private void timer_elapsed(object sender, system.eventargs e) { try { dowork(); } catch (exception ex) { logger.logexception(ex); } } // 你要周期执行的任务。 protected abstract void dowork(); }
我们如果要实现自己的后台工作者,只需要继承该类,实现 dowork()
方法即可。
public class testworker : periodicbackgroundworkerbase { public testworker(abptimer timer) : base(timer) { // 每五分钟执行一次。 timer.period = (int)timespan.fromminutes(5).totalmilliseconds; } protected override void dowork() { console.writeline("后台工作者被执行了。"); } }
然后在我们自己模块的 onpreapplicationinitialization()
方法内解析出后台作业管理器(ibackgroundworkermanager
),调用它的 add()
方法,将我们定义的 testworker
添加到管理器当中即可。
后台工作者管理器
所有的后台工作者都是通过 ibackgroundworkermanager
进行管理的,它提供了 startasync()
、stopasync()
、add()
方法。前面两个方法就是 irunnable
接口定义的,后台工作者管理器直接集成了该接口,后面的 add()
方法就是用来动态添加我们的后台工作者。
后台工作者管理器的默认实现是 backgroundworkermanager
类型,它内部做的事情很简单,就是维护一个后台工作者集合。每当调用 startasync()
或 stopasync()
方法的时候,都从这个集合遍历后台工作者,执行他们的启动和停止方法。
这里值得注意的一点是,当我们调用 add()
方法添加了一个后台工作者之后,后台工作者管理器就会启动这个后台工作者。
public class backgroundworkermanager : ibackgroundworkermanager, isingletondependency, idisposable { protected bool isrunning { get; private set; } private bool _isdisposed; private readonly list<ibackgroundworker> _backgroundworkers; public backgroundworkermanager() { _backgroundworkers = new list<ibackgroundworker>(); } public virtual void add(ibackgroundworker worker) { _backgroundworkers.add(worker); // 如果当前后台工作者管理器还处于运行状态,则调用工作者的 startasync() 方法启动。 if (isrunning) { asynchelper.runsync( () => worker.startasync() ); } } public virtual void dispose() { if (_isdisposed) { return; } _isdisposed = true; //todo: ??? } // 启动,则遍历集合启动。 public virtual async task startasync(cancellationtoken cancellationtoken = default) { isrunning = true; foreach (var worker in _backgroundworkers) { await worker.startasync(cancellationtoken); } } // 停止, 则遍历集合停止。 public virtual async task stopasync(cancellationtoken cancellationtoken = default) { isrunning = false; foreach (var worker in _backgroundworkers) { await worker.stopasync(cancellationtoken); } } }
上述代码其实存在一个问题,即后台工作者被释放以后,是否还能执行 add()
操作。参考我 ,其实当对象被释放之后,就应该抛出 objectdisposedexception
异常。
后台作业
比起后台工作者,我们执行一次性任务的时候,一般会使用后台作业进行处理。比起只能设置固定周期的 periodicbackgroundworkerbase
,集成了 hangfire 的后台作业管理器,能够让我们使用 cron 表达式,更加灵活地设置任务的执行周期。
模块的构造
关于后台作业的模块,我们需要说道两处。第一处是位于 volo.abp.backgroundjobs.abstractions 项目的 abpbackgroundjobsabstractionsmodule
,第二出则是位于 volo.abp.backgroundjobs 项目的 abpbackgroundjobsmodule
。
abpbackgroundjobsabstractionsmodule
的主要行为是将符合条件的后台作业,添加到 abpbackgroundjoboptions
配置当中,以便后续进行使用。
public override void preconfigureservices(serviceconfigurationcontext context) { registerjobs(context.services); } private static void registerjobs(iservicecollection services) { var jobtypes = new list<type>(); // 如果注册的类型符合 ibackgroundjob<> 泛型,则添加到集合当中。 services.onregistred(context => { if (reflectionhelper.isassignabletogenerictype(context.implementationtype, typeof(ibackgroundjob<>))) { jobtypes.add(context.implementationtype); } }); services.configure<abpbackgroundjoboptions>(options => { // 将数据赋值给配置类。 foreach (var jobtype in jobtypes) { options.addjob(jobtype); } }); }
volo.abp.backgroundjobs 内部是 abp vnext 为我们提供的 默认后台作业管理器,这个后台作业管理器 本质上是一个后台工作者。
这个后台工作者会周期性(取决于 abpbackgroundjobworkeroptions.jobpollperiod
值,默认为 5 秒种)地从 ibackgroundjobstore
捞出一堆后台任务,并且在后台执行。至于每次执行多少个后台任务,这也取决于 abpbackgroundjobworkeroptions.maxjobfetchcount
的值,默认值是 1000 个。
注意:
这里的 options 类是
abpbackgroundjobworkeroptions
,别和abpbackgroundworkeroptions
混淆了。
所以在 abpbackgroundjobsmodule
模块里面,只做了一件事情,就是将负责后台作业的后台工作者,添加到后台工作者管理器种,并开始周期性地执行。
public override void onapplicationinitialization(applicationinitializationcontext context) { var options = context.serviceprovider.getrequiredservice<ioptions<abpbackgroundjoboptions>>().value; if (options.isjobexecutionenabled) { // 获得后台工作者管理器,并将负责后台作业的工作者添加进去。 context.serviceprovider .getrequiredservice<ibackgroundworkermanager>() .add(context.serviceprovider.getrequiredservice<ibackgroundjobworker>() ); } }
后台作业的定义
在上一节里面看到,只要是实现 ibackgroundjob<targs>
类型的都视为一个后台作业。这个后台作业接口,只定义了一个行为,那就是执行(execute(targs)
)。这里的 targs
泛型作为执行后台作业时,需要传递的参数类型。
// 因为是传入的参数,所以泛型参数是逆变的。 public interface ibackgroundjob<in targs> { void execute(targs args); }
检查源码,发现 abp vnext 的邮箱模块定义了一个邮件发送任务 backgroundemailsendingjob
,它的实现大概如下。
public class backgroundemailsendingjob : backgroundjob<backgroundemailsendingjobargs>, itransientdependency { // ... public override void execute(backgroundemailsendingjobargs args) { asynchelper.runsync(() => emailsender.sendasync(args.to, args.subject, args.body, args.isbodyhtml)); } }
后台作业管理器
后台作业都是通过一个后台作业管理器(ibackgroundjobmanager
)进行管理的,这个接口定义了一个入队方法(enqueueasync()
),注意,我们的后台作业在入队后,不是马上执行的。
说一下这个入队处理逻辑:
- 首先我们会通过参数的类型,获取到任务的名称。(假设任务上面没有标注
backgroundjobnameattribute
特性,那么任务的名称就是参数类型的fullname
。) - 构造一个
backgroundjobinfo
对象。 - 通过
ibackgroundjobstore
持久化任务信息。
public virtual async task<string> enqueueasync<targs>(targs args, backgroundjobpriority priority = backgroundjobpriority.normal, timespan? delay = null) { // 获取任务名称。 var jobname = backgroundjobnameattribute.getname<targs>(); var jobid = await enqueueasync(jobname, args, priority, delay); return jobid.tostring(); } protected virtual async task<guid> enqueueasync(string jobname, object args, backgroundjobpriority priority = backgroundjobpriority.normal, timespan? delay = null) { var jobinfo = new backgroundjobinfo { id = guidgenerator.create(), jobname = jobname, // 通过序列化器,序列化参数值,方便存储。这里内部其实使用的是 json.net 进行序列化。 jobargs = serializer.serialize(args), priority = priority, creationtime = clock.now, nexttrytime = clock.now }; // 如果任务有执行延迟,则任务的初始执行时间要加上这个延迟。 if (delay.hasvalue) { jobinfo.nexttrytime = clock.now.add(delay.value); } // 持久化任务信息,方便后面执行后台作业的工作者能够取到。 await store.insertasync(jobinfo); return jobinfo.id; }
backgroundjobnameattribute
相关的方法:
public static string getname<tjobargs>() { return getname(typeof(tjobargs)); } public static string getname([notnull] type jobargstype) { check.notnull(jobargstype, nameof(jobargstype)); // 判断参数类型上面是否标注了特性,并且特性实现了 ibackgroundjobnameprovider 接口。 return jobargstype .getcustomattributes(true) .oftype<ibackgroundjobnameprovider>() .firstordefault() ?.name // 拿不到名字,则使用类型的 fullname。 ?? jobargstype.fullname; }
后台作业的存储
后台作业的存储默认是放在内存的,这点可以从 inmemorybackgroundjobstore
类型实现看出来。在它的内部使用了一个并行字典,通过作业的 guid 与作业进行关联绑定。
除了内存实现,在 volo.abp.backgroundjobs.domain 模块还有一个 backgroundjobstore
实现,基本套路与 settingstore
一样,都是存储到数据库里面。
public class backgroundjobstore : ibackgroundjobstore, itransientdependency { protected ibackgroundjobrepository backgroundjobrepository { get; } // ... public backgroundjobinfo find(guid jobid) { return objectmapper.map<backgroundjobrecord, backgroundjobinfo>( backgroundjobrepository.find(jobid) ); } // ... public void insert(backgroundjobinfo jobinfo) { backgroundjobrepository.insert( objectmapper.map<backgroundjobinfo, backgroundjobrecord>(jobinfo) ); } // ... }
后台作业的执行
默认的后台作业管理器是通过一个后台工作者来执行后台任务的,这个实现叫做 backgroundjobworker
,这个后台工作者的生命周期也是单例的。后台作业的具体执行逻辑里面,涉及到了以下几个类型的交互。
类型 | 作用 |
---|---|
abpbackgroundjoboptions |
提供每个后台任务的配置信息,包括任务的类型、参数类型、任务名称数据。 |
abpbackgroundjobworkeroptions |
提供后台作业工作者的配置信息,例如每个周期 最大执行的作业数量、后台 工作者的 执行周期、作业执行 超时时间 等。 |
backgroundjobconfiguration |
后台任务的配置信息,作用是将持久化存储的作业信息与运行时类型进行绑定 和实例化,以便 abp vnext 来执行具体的任务。 |
ibackgroundjobexecuter |
后台作业的执行器,当我们从持久化存储获取到后台作业信息时,将会通过 这个执行器来执行具体的后台作业。 |
ibackgroundjobserializer |
后台作业序列化器,用于后台作业持久化时进行序列化的工具,默认采用的 是 json.net 进行实现。 |
jobexecutioncontext |
执行器在执行后台作业时,是通过这个上下文参数进行执行的,在这个上下 文内部,包含了后台作业的具体类型、后台作业的参数值。 |
ibackgroundjobstore |
前面已经讲过了,这个是用于后台作业的持久化存储,默认实现是存储在内存。 |
backgroundjobpriority |
后台作业的执行优先级定义,abp vnext 在执行后台任务时,会根据任务的优 先级进行排序,以便在后面执行的时候优先级高的任务先执行。 |
我们来按照逻辑顺序走一遍它的实现,首先后台作业的执行工作者会从持久化存储内,获取 maxjobfetchcount
个任务用于执行。从持久化存储获取后台作业信息(backgroundjobinfo
),是由 ibackgroundjobstore
提供的。
var store = scope.serviceprovider.getrequiredservice<ibackgroundjobstore>(); var waitingjobs = store.getwaitingjobs(workeroptions.maxjobfetchcount); // 不存在任何后台作业,则直接结束本次调用。 if (!waitingjobs.any()) { return; }
inmemorybackgroundjobstore
的相关实现:
public list<backgroundjobinfo> getwaitingjobs(int maxresultcount) { return _jobs.values .where(t => !t.isabandoned && t.nexttrytime <= clock.now) .orderbydescending(t => t.priority) .thenby(t => t.trycount) .thenby(t => t.nexttrytime) .take(maxresultcount) .tolist(); }
上面的代码可以看出来,首先排除 被放弃的任务 ,包含达到执行时间的任务,然后根据任务的优先级从高到低进行排序。重试次数少的优先执行,预计执行时间越早的越先执行。最后从这些数据中,筛选出 maxresultcount
结果并返回。
说到这里,我们来看一下这个 nexttrytime
是如何被计算出来的?回想起最开始的后台作业管理器,我们在添加一个后台任务的时候,就会设置这个后台任务的 预计执行时间。第一个任务被添加到执行队列中时,它的值一般是 clock.now
,也就是它被添加到队列的时间。
不过 abp vnext 为了让那些经常执行失败的任务,有比较低的优先级再执行,就在每次任务执行失败之后,会将 nexttrytime
的值指数级进行增加。这块代码可以在 calculatenexttrytime
里面看到,也就是说某个任务的执行 失败次数越高,那么它下一次的预期执行时间就会越远。
protected virtual datetime? calculatenexttrytime(backgroundjobinfo jobinfo, iclock clock) { // 一般来说,这个 defaultwaitfactor 因子的值是 2.0 。 var nextwaitduration = workeroptions.defaultfirstwaitduration * (math.pow(workeroptions.defaultwaitfactor, jobinfo.trycount - 1)); // 同执行失败的次数进行挂钩。 var nexttrydate = jobinfo.lasttrytime?.addseconds(nextwaitduration) ?? clock.now.addseconds(nextwaitduration); if (nexttrydate.subtract(jobinfo.creationtime).totalseconds > workeroptions.defaulttimeout) { return null; } return nexttrydate; }
当预期的执行时间都超过 defaulttimeout
的超时时间时(默认为 2 天),说明这个任务确实没救了,就不要再执行了。
我们之前说到,从 ibackgroundjobstore
拿到了需要执行的后台任务信息集合,接下来我们就要开始执行后台任务了。
foreach (var jobinfo in waitingjobs) { jobinfo.trycount++; jobinfo.lasttrytime = clock.now; try { // 根据任务名称获取任务的配置参数。 var jobconfiguration = joboptions.getjob(jobinfo.jobname); // 根据配置里面存储的任务类型,将参数值进行反序列化。 var jobargs = serializer.deserialize(jobinfo.jobargs, jobconfiguration.argstype); // 构造一个新的执行上下文,让执行器执行任务。 var context = new jobexecutioncontext(scope.serviceprovider, jobconfiguration.jobtype, jobargs); try { jobexecuter.execute(context); // 如果任务执行成功则删除该任务。 store.delete(jobinfo.id); } catch (backgroundjobexecutionexception) { // 发生任务执行失败异常时,根据指定的公式计算下一次的执行时间。 var nexttrytime = calculatenexttrytime(jobinfo, clock); if (nexttrytime.hasvalue) { jobinfo.nexttrytime = nexttrytime.value; } else { // 超过超时时间的时候,公式计算函数返回 null,该任务置为废弃任务。 jobinfo.isabandoned = true; } tryupdate(store, jobinfo); } } catch (exception ex) { // 执行过程中,产生了未知异常,设置为废弃任务,并打印日志。 logger.logexception(ex); jobinfo.isabandoned = true; tryupdate(store, jobinfo); } }
执行后台任务的时候基本分为 5 步,它们分别是:
- 获得任务关联的配置参数,默认不用提供,因为在之前模块初始化的时候就已经配置了(你也可以显式指定)。
- 通过之前存储的配置参数,将参数值反序列化出来,构造具体实例。
- 构造一个执行上下文。
- 后台任务执行器执行具体的后台任务。
- 成功则删除任务,失败则更新任务下次的执行状态。
至于执行器里面的真正执行操作,你都拿到了参数值和任务类型了。就可以通过类型用 ioc 获取后台任务对象的实例,然后通过反射匹配方法签名,在实例上调用这个方法传入参数即可。
public virtual void execute(jobexecutioncontext context) { // 构造具体的后台作业实例对象。 var job = context.serviceprovider.getservice(context.jobtype); if (job == null) { throw new abpexception("the job type is not registered to di: " + context.jobtype); } // 获得需要执行的方法签名。 var jobexecutemethod = context.jobtype.getmethod(nameof(ibackgroundjob<object>.execute)); if (jobexecutemethod == null) { throw new abpexception($"given job type does not implement {typeof(ibackgroundjob<>).name}. the job type was: " + context.jobtype); } try { // 直接通过 methodinfo 的 invoke 方法调用,传入具体的实例对象和参数值即可。 jobexecutemethod.invoke(job, new[] { context.jobargs }); } catch (exception ex) { logger.logexception(ex); // 如果是执行方法内的异常,则包装进行处理,然后抛出。 throw new backgroundjobexecutionexception("a background job execution is failed. see inner exception for details.", ex) { jobtype = context.jobtype.assemblyqualifiedname, jobargs = context.jobargs }; } }
集成 hangfire
abp vnext 对于 hangfire 的集成代码分布在 volo.abp.hangfire 和 volo.abp.backgroundjobs.hangfire 模块内部,前者是在模块配置里面,调用 hangfire 库的相关方法,注入组件到 ioc 容器当中。后者则是对后台作业进行了适配处理,替换了默认的 ibackgroundjobmanager
实现。
在 abphangfiremodule
模块内部,通过工厂创建出来一个 backgroudjobserver
实例,并将它的生命周期与应用程序的生命周期进行绑定,以便进行销毁处理。
public class abphangfiremodule : abpmodule { private backgroundjobserver _backgroundjobserver; public override void configureservices(serviceconfigurationcontext context) { context.services.addhangfire(configuration => { context.services.executepreconfiguredactions(configuration); }); } public override void onapplicationinitialization(applicationinitializationcontext context) { var options = context.serviceprovider.getrequiredservice<ioptions<abphangfireoptions>>().value; _backgroundjobserver = options.backgroundjobserverfactory.invoke(context.serviceprovider); } public override void onapplicationshutdown(applicationshutdowncontext context) { //todo: abp may provide two methods for application shutdown: onpreapplicationshutdown & onapplicationshutdown _backgroundjobserver.sendstop(); _backgroundjobserver.dispose(); } }
我们直奔主题,看一下基于 hangfire 的后台作业管理器是怎么实现的。
public class hangfirebackgroundjobmanager : ibackgroundjobmanager, itransientdependency { public task<string> enqueueasync<targs>(targs args, backgroundjobpriority priority = backgroundjobpriority.normal, timespan? delay = null) { // 如果没有延迟参数,则直接通过 enqueue() 方法扔进执行对了。 if (!delay.hasvalue) { return task.fromresult( backgroundjob.enqueue<hangfirejobexecutionadapter<targs>>( adapter => adapter.execute(args) ) ); } else { return task.fromresult( backgroundjob.schedule<hangfirejobexecutionadapter<targs>>( adapter => adapter.execute(args), delay.value ) ); } }
上述代码中使用 hangfirejobexecutionadapter
进行了一个适配操作,因为 hangfire 要将一个后台任务扔进队列执行,不是用 targs
就能解决的。
转到这个适配器定义,提供了一个 execute(targs)
方法,当被添加到 hangfire 队列执行的时候。实际 hangfire 会调用适配器的 excetue(targs)
方法,然后内部还是使用的 ibackgroundjobexecuter
来执行具体定义的任务。
public class hangfirejobexecutionadapter<targs> { protected abpbackgroundjoboptions options { get; } protected iservicescopefactory servicescopefactory { get; } protected ibackgroundjobexecuter jobexecuter { get; } public hangfirejobexecutionadapter( ioptions<abpbackgroundjoboptions> options, ibackgroundjobexecuter jobexecuter, iservicescopefactory servicescopefactory) { jobexecuter = jobexecuter; servicescopefactory = servicescopefactory; options = options.value; } public void execute(targs args) { using (var scope = servicescopefactory.createscope()) { var jobtype = options.getjob(typeof(targs)).jobtype; var context = new jobexecutioncontext(scope.serviceprovider, jobtype, args); jobexecuter.execute(context); } } }
集成 rabbitmq
基于 rabbitmq 的后台作业实现,我想放在分布式事件总线里面,对其一起进行讲解。
三、总结
abp vnext 为我们提供了多种后台作业管理器的实现,你可以根据自己的需求选用不同的后台作业管理器,又或者是自己动手造*。
需要看其他的 abp vnext 相关文章? 即可跳转到总目录。
上一篇: 在线转换服务