[Abp 源码分析]十六、后台作业与后台工作者
0. 简介
在某些时候我们可能会需要执行后台任务,或者是执行一些周期性的任务。比如说可能每隔 1 个小时要清除某个临时文件夹内的数据,可能用户会要针对某一个用户群来群发一组短信。前面这些就是典型的应用场景,在 abp 框架里面为我们准备了后台作业和后台工作者来帮助我们解决这个问题。
后台作业与后台工作者的区别是,前者主要用于某些耗时较长的任务,而不想阻塞用户的时候所使用。后者主要用于周期性的执行某些任务,从 “工作者” 的名字可以看出来,就是一个个工人,而且他们每个工人都拥有单独的后台线程。
0.1 典型场景
后台作业
- 某个用户按下了报表按钮来生成一个需要长时间等待的报表。你添加这个工作到队列中,当报表生成完毕后,发送报表结果到该用户的邮箱。
- 在后台作业中发送一封邮件,有些问题可能会导致发送失败(网络连接异常,或者主机宕机);由于有后台作业以及持久化机制,在问题排除后,可以重试以保证任务的成功执行。
后台工作者
- 后台工作者能够周期性地执行旧日志的删除。
- 后台工作者可以周期性地筛选出非活跃性用户,并且发送回归邮件给这些用户。
1. 启动流程
后台作业与后台工作者都是通过各自的 manager(ibackgroundjobmanager
/ibackgroundworkermanager
) 来进行管理的。而这两个 manager 分别继承了 isingletondependency
接口,所以在启动的时候就会自动注入这两个管理器以便开发人员管理操作。
这里值得注意的一点是,ibackgroundjobmanager
接口是 ibackgroundworker
的派生接口,而 ibackgroudworker
是归属于 ibackgroundworkermanager
进行管理的。
所以,你可以在 abpkernelmodule
里面看到如下代码:
public sealed class abpkernelmodule : abpmodule { public override void postinitialize() { // 注册可能缺少的组件 registermissingcomponents(); // ... 忽略的代码 // 各种管理器的初始化操作 // 从配置项中读取,是否启用了后台作业功能 if (configuration.backgroundjobs.isjobexecutionenabled) { var workermanager = iocmanager.resolve<ibackgroundworkermanager>(); // 开始启动后台工作者 workermanager.start(); // 增加后台作业管理器 workermanager.add(iocmanager.resolve<ibackgroundjobmanager>()); } } }
可以看到,后台作业管理器是作为一个后台工作者被添加到了 ibackgroundworkermanager
当中来执行的。
2. 代码分析
2.1 后台工作者
2.1.1 后台工作者管理器
abp 通过后台工作者管理器来管理后台作业队列,所以我们首先来看一下后台工作者管理器接口的定义是什么样子的。
public interface ibackgroundworkermanager : irunnable { void add(ibackgroundworker worker); }
还是相当简洁的,就一个 add
方法用来添加一个新的后台工作者对象。只是在这个地方,可以看到该接口又是集成自 irunnable
接口,那么该接口的作用又是什么呢?
转到其定义可以看到,irunable
接口定义了三个基本的方法:start()
、stop()
、waitstop()
,而且他拥有一个默认实现 runablebase
,其实就是用来标识一个任务的运行状态。
public interface irunnable { // 开始执行任务 void start(); // 停止执行任务 void stop(); // 阻塞线程,等待任务执行完成后标识为停止。 void waittostop(); } public abstract class runnablebase : irunnable { // 用于标识任务是否运行的布尔值变量 public bool isrunning { get { return _isrunning; } } private volatile bool _isrunning; // 启动之后表示任务正在运行 public virtual void start() { _isrunning = true; } // 停止之后表示任务结束运行 public virtual void stop() { _isrunning = false; } public virtual void waittostop() { } }
到目前为止整个代码都还是比较简单清晰的,我们接着看 ibackgroundworkermanager
的默认实现 backgroundworkermanager
类,首先我们看一下该类拥有哪些属性与字段。
public class backgroundworkermanager : runnablebase, ibackgroundworkermanager, isingletondependency, idisposable { private readonly iiocresolver _iocresolver; private readonly list<ibackgroundworker> _backgroundjobs; public backgroundworkermanager(iiocresolver iocresolver) { _iocresolver = iocresolver; _backgroundjobs = new list<ibackgroundworker>(); } }
在后台工作者管理器类的内部,默认有一个 list 集合,用于维护所有的后台工作者对象。那么其他的 start()
等方法肯定是基于这个集合进行操作的。
public override void start() { base.start(); _backgroundjobs.foreach(job => job.start()); } public override void stop() { _backgroundjobs.foreach(job => job.stop()); base.stop(); } public override void waittostop() { _backgroundjobs.foreach(job => job.waittostop()); base.waittostop(); }
可以看到实现还是比较简单的,接下来我们继续看他的 add()
方法是如何进行操作的?
public void add(ibackgroundworker worker) { _backgroundjobs.add(worker); if (isrunning) { worker.start(); } }
在这里我们看到他会针对 isrunning
进行判定是否立即启动加入的后台工作者对象。而这个 isrunning
属性值唯一产生变化的情况就在于 start()
方法与 stop()
方法的调用。
最后肯定也有相关的销毁方法,用于释放所有注入的后台工作者对象,并将集合清除。
private bool _isdisposed; public void dispose() { if (_isdisposed) { return; } _isdisposed = true; // 遍历集合,通过 ioc 解析器的 release 方法释放对象 _backgroundjobs.foreach(_iocresolver.release); // 清空集合 _backgroundjobs.clear(); }
所以,针对于所有后台工作者的管理,都是通过 ibackgroundworkermanager
来进行操作的。
2.1.2 后台工作者
看完了管理器,我们来看一下 ibackgroundworker
后台工作者对象是怎样的构成。
public interface ibackgroundworker : irunnable { }
貌似只是一个空的接口,其作用主要是标识某个类型是否为后台工作者,转到其抽象类实现 backgroundworkerbase
,里面只是注入了一些辅助对象与本地化的一些方法。
public abstract class backgroundworkerbase : runnablebase, ibackgroundworker { // 配置管理器 public isettingmanager settingmanager { protected get; set; } // 工作单元管理器 public iunitofworkmanager unitofworkmanager { get { if (_unitofworkmanager == null) { throw new abpexception("must set unitofworkmanager before use it."); } return _unitofworkmanager; } set { _unitofworkmanager = value; } } private iunitofworkmanager _unitofworkmanager; // 获得当前的工作单元 protected iactiveunitofwork currentunitofwork { get { return unitofworkmanager.current; } } // 本地化资源管理器 public ilocalizationmanager localizationmanager { protected get; set; } // 默认的本地化资源的源名称 protected string localizationsourcename { get; set; } protected ilocalizationsource localizationsource { get { // 如果没有配置源名称,直接抛出异常 if (localizationsourcename == null) { throw new abpexception("must set localizationsourcename before, in order to get localizationsource"); } if (_localizationsource == null || _localizationsource.name != localizationsourcename) { _localizationsource = localizationmanager.getsource(localizationsourcename); } return _localizationsource; } } private ilocalizationsource _localizationsource; // 日志记录器 public ilogger logger { protected get; set; } protected backgroundworkerbase() { logger = nulllogger.instance; localizationmanager = nulllocalizationmanager.instance; } // ... 其他模板代码 }
我们接着看继承并实现了 backgroundworkerbase
的类型 periodicbackgroundworkerbase
,从字面意思上来看,该类型应该是一个定时后台工作者基类。
重点在于 periodic
(定时),从其类型内部的定义可以看到,该类型使用了一个 abptimer
对象来进行周期计时与具体工作任务的触发。我们暂时先不看这个 abptimer
,仅仅看 periodicbackgroundworkerbase
的内部实现。
public abstract class periodicbackgroundworkerbase : backgroundworkerbase { protected readonly abptimer timer; // 注入 abptimer protected periodicbackgroundworkerbase(abptimer timer) { timer = timer; // 绑定周期执行的任务,这里是 dowork() timer.elapsed += timer_elapsed; } public override void start() { base.start(); timer.start(); } public override void stop() { timer.stop(); base.stop(); } public override void waittostop() { timer.waittostop(); base.waittostop(); } private void timer_elapsed(object sender, system.eventargs e) { try { dowork(); } catch (exception ex) { logger.warn(ex.tostring(), ex); } } protected abstract void dowork(); }
可以看到,这里基类绑定了 dowork()
作为其定时执行的方法,那么用户在使用的时候直接继承自该基类,然后重写 dowork()
方法即可绑定自己的后台工作者的任务。
2.1.3 abptimer 定时器
在上面的基类我们看到,基类的 start()
、stop()
、waittpstop()
方法都是调用的 abptimer
所提供的,所以说 abptimer
其实也继承了 runablebase
基类并实现其具体的启动与停止操作。
其实 abptimer
的核心就是通过 clr 的 timer
来实现周期性任务执行的,不过默认的 timer
类有两个比较大的问题。
- clr 的
timer
并不会等待你的任务执行完再执行下一个周期的任务,如果你的某个任务耗时过长,超过了timer
定义的周期。那么timer
会开启一个新的线程执行,这样的话最后我们系统的资源会因为线程大量重复创建而被拖垮。 - 如何知道一个
timer
所执行的业务方法已经真正地被结束了。
所以 abp 才会重新封装一个 abptimer
作为一个基础的计时器。第一个问题的解决方法很简单,就是在执行具体绑定的业务方法之前,通过 timer.change()
方法来让 timer
临时失效。等待业务方法执行完成之后,再将 timer
的周期置为用户设定的周期。
// clr timer 绑定的回调方法 private void timercallback(object state) { lock (_tasktimer) { if (!_running || _performingtasks) { return; } // 暂时让 timer 失效 _tasktimer.change(timeout.infinite, timeout.infinite); // 设置执行标识为 true,表示当前的 abptimer 正在执行 _performingtasks = true; } try { // 如果绑定了相应的触发事件 if (elapsed != null) { // 执行相应的业务方法,这里就是最开始绑定的 dowork() 方法 elapsed(this, new eventargs()); } } catch { } finally { lock (_tasktimer) { // 标识业务方法执行完成 _performingtasks = false; if (_running) { // 更改周期为用户指定的执行周期,等待下一次触发 _tasktimer.change(period, timeout.infinite); } monitor.pulse(_tasktimer); } } }
针对于第二个问题,abp 通过 waittostop()
方法会阻塞调用这个 timer
的线程,并且在 _performingtasks
标识位是 false
的时候释放。
public override void waittostop() { // 锁定 clr 的 timer 对象 lock (_tasktimer) { // 循环检测 while (_performingtasks) { monitor.wait(_tasktimer); } } base.waittostop(); }
至于其他的 start()
方法就是使用 clr 的 timer
更改其执行周期,而 stop()
就是直接将 timer
的周期设置为无限大,使计时器失效。
2.1.4 总结
abp 后台工作者的核心就是通过 abptimer
来实现周期性任务的执行,用户只需要继承自 periodicbackgroundworkerbase
,然后将其添加到 ibackgroundworkermanager
的集合当中。这样 abp 在启动之后就会遍历这个工作者集合,然后周期执行这些后台工作者绑定的方法。
当然如果你继承了 periodicbackgroundworkerbase
之后,可以通过设置构造函数的 abptimer
来指定自己的执行周期。
2.2 后台作业队列
后台工作队列的管理是通过 ibackgroundjobmanager
来处理的,而该接口又继承自 ibackgroundworker
,所以一整个后台作业队列就是一个后台工作者,只不过这个工作者有点特殊。
2.2.1 后台作业管理器
ibackgroundjobmanager
接口的定义其实就两个方法,一个 enqueueasync<tjob, targs>()
用于将一个后台作业加入到执行队列当中。而 deleteasync()
方法呢,顾名思义就是从队列当中移除指定的后台作业。
首先看一下其默认实现 backgroundjobmanager
,该实现同样是继承自 periodicbackgroundworkerbase
并且其默认周期为 5000 ms。
public class backgroundjobmanager : periodicbackgroundworkerbase, ibackgroundjobmanager, isingletondependency { // 事件总线 public ieventbus eventbus { get; set; } // 轮训后台作业的间隔,默认值为 5000 毫秒. public static int jobpollperiod { get; set; } // ioc 解析器 private readonly iiocresolver _iocresolver; // 后台作业队列存储 private readonly ibackgroundjobstore _store; static backgroundjobmanager() { jobpollperiod = 5000; } public backgroundjobmanager( iiocresolver iocresolver, ibackgroundjobstore store, abptimer timer) : base(timer) { _store = store; _iocresolver = iocresolver; eventbus = nulleventbus.instance; timer.period = jobpollperiod; } }
基础结构基本上就这个样子,接下来看一下他的两个接口方法是如何实现的。
enqueueasync<tjob, targs>
方法通过传入指定的后台作业对象和相应的参数,同时还有任务的优先级。将其通过 ibackgroundjobstore
进行持久化,并返回一个任务的唯一 jobid 以便进行删除操作。
public async task<string> enqueueasync<tjob, targs>(targs args, backgroundjobpriority priority = backgroundjobpriority.normal, timespan? delay = null) where tjob : ibackgroundjob<targs> { // 通过 jobinfo 包装任务的基本信息 var jobinfo = new backgroundjobinfo { jobtype = typeof(tjob).assemblyqualifiedname, jobargs = args.tojsonstring(), priority = priority }; // 如果需要延时执行的话,则用当前时间加上延时的时间作为任务下次运行的时间 if (delay.hasvalue) { jobinfo.nexttrytime = clock.now.add(delay.value); } // 通过 store 进行持久话存储 await _store.insertasync(jobinfo); // 返回后台任务的唯一标识 return jobinfo.id.tostring(); }
至于删除操作,在 manager 内部其实也是通过 ibackgroundjobstore
进行实际的删除操作的。
public async task<bool> deleteasync(string jobid) { // 判断 jobid 的值是否有效 if (long.tryparse(jobid, out long finaljobid) == false) { throw new argumentexception($"the jobid '{jobid}' should be a number.", nameof(jobid)); } // 使用 jobid 从 store 处筛选到 jobinfo 对象的信息 backgroundjobinfo jobinfo = await _store.getasync(finaljobid); if (jobinfo == null) { return false; } // 如果存在有 jobinfo 则使用 store 进行删除操作 await _store.deleteasync(jobinfo); return true; }
后台作业管理器实质上是一个周期性执行的后台工作者,那么我们的后台作业是每 5000 ms 执行一次,那么他的 dowork()
方法又在执行什么操作呢?
protected override void dowork() { // 从 store 当中获得等待执行的后台作业集合 var waitingjobs = asynchelper.runsync(() => _store.getwaitingjobsasync(1000)); // 遍历这些等待执行的后台任务,然后通过 tryprocessjob 进行执行 foreach (var job in waitingjobs) { tryprocessjob(job); } }
可以看到每 5 秒钟我们的后台作业管理器就会从 ibackgroundjobstore
当中拿到最大 1000 条的后台作业信息,然后遍历这些信息。通过 tryprocessjob(job)
方法来执行后台作业。
而 tryprocessjob()
方法,本质上就是通过反射构建出一个 ibackgroundjob
对象,然后取得序列化的参数值,通过反射得到的 methodinfo
对象来执行我们的后台任务。执行完成之后,就会从 store 当中移除掉执行完成的任务。
针对于在执行过程当中所出现的异常,会通过 ieventbus
触发一个 abphandledexceptiondata
事件记录后台作业执行失败时的异常信息。并且一旦在执行过程当中出现了任何异常的情况,都会将该任务的 isabandoned
字段置为 true
,当该字段为 true
时,该任务将不再回被执行。
ps:就是在
getwaitingjobsasync()
方法时,会过滤掉 isabandoned 值为true
的任务。
private void tryprocessjob(backgroundjobinfo jobinfo) { try { // 任务执行次数自增 1 jobinfo.trycount++; // 最后一次执行时间设置为当前时间 jobinfo.lasttrytime = clock.now; // 通过反射取得后台作业的类型 var jobtype = type.gettype(jobinfo.jobtype); // 通过 ioc 解析器得到一个临时的后台作业对象,执行完之后既被释放 using (var job = _iocresolver.resolveasdisposable(jobtype)) { try { // 通过反射得到后台作业的 execute 方法 var jobexecutemethod = job.object.gettype().gettypeinfo().getmethod("execute"); var argstype = jobexecutemethod.getparameters()[0].parametertype; var argsobj = jsonconvert.deserializeobject(jobinfo.jobargs, argstype); // 结合持久话存储的参数信息,调用 execute 方法进行后台作业 jobexecutemethod.invoke(job.object, new[] { argsobj }); // 执行完成之后从 store 删除该任务的信息 asynchelper.runsync(() => _store.deleteasync(jobinfo)); } catch (exception ex) { logger.warn(ex.message, ex); // 计算下一次执行的时间,一旦超过 2 天该任务都执行失败,则返回 null var nexttrytime = jobinfo.calculatenexttrytime(); if (nexttrytime.hasvalue) { jobinfo.nexttrytime = nexttrytime.value; } else { // 如果为 null 则说明该任务在 2 天的时间内都没有执行成功,则放弃继续执行 jobinfo.isabandoned = true; } // 更新 store 存储的任务信息 tryupdate(jobinfo); // 触发异常事件 eventbus.trigger( this, new abphandledexceptiondata( new backgroundjobexception( "a background job execution is failed. see inner exception for details. see backgroundjob property to get information on the background job.", ex ) { backgroundjob = jobinfo, jobobject = job.object } ) ); } } } catch (exception ex) { logger.warn(ex.tostring(), ex); // 表示任务不再执行 jobinfo.isabandoned = true; // 更新 store tryupdate(jobinfo); } }
2.2.2 后台作业
后台作业的默认接口定义为 ibackgroundjob<in targs>
,他只有一个 execute(targs args)
方法,用于接收指定类型的作业参数,并执行。
一般来说我们不建议直接通过继承 ibackgroundjob<in targs>
来实现后台作业,而是继承自 backgroundjob<targs>
抽象类。该抽象类内部也没有什么特别的实现,主要是注入了一些基础设施,比如说 uow 与 本地化资源管理器,方便我们开发使用。
后台作业本身是具体执行的对象,而 backgroundjobinfo
则是存储了后台作业的 type 类型和参数,方便在需要执行的时候通过反射的方式执行后台作业。
2.2.2 后台作业队列存储
从 ibackgroundjobstore
我们就可以猜到以 abp 框架的套路,他肯定会有两种实现,第一种就是基于内存的 inmemorybackgroundjobstore
。而第二种呢,就是由 abp.zero 模块所提供的基于数据库的 backgroundjobstore
。
ibackgroundjobstore
接口所定义的方法基本上就是增删改查,没有什么复杂的。
public interface ibackgroundjobstore { // 通过 jobid 获取后台任务信息 task<backgroundjobinfo> getasync(long jobid); // 插入一个新的后台任务信息 task insertasync(backgroundjobinfo jobinfo); /// <summary> /// gets waiting jobs. it should get jobs based on these: /// conditions: !isabandoned and nexttrytime <= clock.now. /// order by: priority desc, trycount asc, nexttrytime asc. /// maximum result: <paramref name="maxresultcount"/>. /// </summary> /// <param name="maxresultcount">maximum result count.</param> task<list<backgroundjobinfo>> getwaitingjobsasync(int maxresultcount); /// <summary> /// deletes a job. /// </summary> /// <param name="jobinfo">job information.</param> task deleteasync(backgroundjobinfo jobinfo); /// <summary> /// updates a job. /// </summary> /// <param name="jobinfo">job information.</param> task updateasync(backgroundjobinfo jobinfo); }
这里先从简单的内存 store 说起,这个 inmemorybackgroundjobstore
内部使用了一个并行字典来存储这些任务信息。
public class inmemorybackgroundjobstore : ibackgroundjobstore { private readonly concurrentdictionary<long, backgroundjobinfo> _jobs; private long _lastid; public inmemorybackgroundjobstore() { _jobs = new concurrentdictionary<long, backgroundjobinfo>(); } }
相当简单,这几个接口方法基本上就是针对与这个并行字典操作的一层封装。
public task<backgroundjobinfo> getasync(long jobid) { return task.fromresult(_jobs[jobid]); } public task insertasync(backgroundjobinfo jobinfo) { jobinfo.id = interlocked.increment(ref _lastid); _jobs[jobinfo.id] = jobinfo; return task.fromresult(0); } public task<list<backgroundjobinfo>> getwaitingjobsasync(int maxresultcount) { var waitingjobs = _jobs.values // 首先筛选出不再执行的后台任务 .where(t => !t.isabandoned && t.nexttrytime <= clock.now) // 第一次根据后台作业的优先级进行排序,高优先级优先执行 .orderbydescending(t => t.priority) // 再根据执行次数排序,执行次数越少的,越靠前 .thenby(t => t.trycount) .thenby(t => t.nexttrytime) .take(maxresultcount) .tolist(); return task.fromresult(waitingjobs); } public task deleteasync(backgroundjobinfo jobinfo) { _jobs.tryremove(jobinfo.id, out _); return task.fromresult(0); } public task updateasync(backgroundjobinfo jobinfo) { // 如果是不再执行的任务,删除 if (jobinfo.isabandoned) { return deleteasync(jobinfo); } return task.fromresult(0); }
至于持久化到数据库,无非是注入一个仓储,然后针对这个仓储进行增删查改的操作罢了,这里就不在赘述。
2.2.3 后台作业优先级
后台作业的优先级定义在 backgroundjobpriority
枚举当中,一共有 5 个等级,分别是 low
、belownormal
、normal
、abovenormal
、high
,他们从最低到最高排列。
3.
上一篇: 我们币圈这么吹牛逼
下一篇: mybatis学习使用4动态sql