java实现任务调度
最近的一个小项目是做一个简单的数据仓库,需要将其他数据库的数据抽取出来,并通过而出抽取成页面需要的数据,以空间换时间的方式,让后端报表查询更快。
因为在抽取的过程中,有一定的先后顺序,需要做一个任务调度器,某一优先级的会先执行,然后会进入下一个优先级的队列任务中。
先定义了一个map的集合,key是优先级,value是任务的集合,某一个优先级内的任务是并发执行的,而不同优先级是串行执行的,前一个优先级执行完之后,后面的才会执行。
concurrenthashmap<integer/* 优先级. */, list<basetask>/* 任务集合. */> tasks = new concurrenthashmap<>();
这个调度管理有一个演进的过程,我先说第一个,这个是比较好理解的。
第一个版本:
首先对tasks集合中的key进行一个排序,我定义的是数字越小就有限执行,则进行遍历key值,并取出某个优先级的任务队列,执行任务队列的任务。任务的执行交给线程池去执行,在遍历内部,需要不断的检查这个队列中的任务是否都执行了,没有则一直等待否则进入到下个队列,任务执行的时候可能会抛出异常,但是不管任务是否异常,都将任务状态设置已执行。
下面是其核心代码:
public void run() { //对key值进行排序 enumeration<integer> keys = tasks.keys(); list<integer> prioritys = new arraylist<>(); while (keys.hasmoreelements()) { prioritys.add(keys.nextelement()); } collections.sort(prioritys);//升序 //对key进行遍历,执行某个某个优先级的任务队列 for (integer priority : prioritys) { list<basetask> tasklist = tasks.get(priority); if (tasklist.isempty()) { continue; } logger.info("execute priority {} task ", tasklist.get(0).priority); for (basetask task : tasklist) { executor.execute(() -> { try { task.dotask(); } catch (exception e) { e.printstacktrace(); } });//线程中执行任务 } while (true) {//等待所有线程都执行完成之后执行下一个任务队列 boolean finish = true; for (basetask t : tasklist) { if (!t.finish) { finish = false; } } if (finish) {//当前任务都执行完毕 break; } misc.sleep(1000);//thread.sleep(1000) } misc.sleep(1000); } }
关键代码很好理解,在任务执行之前,需要对所有任务都初始化,初始化的时候给出每个任务的优先级和任务名称,任务抽象类如下:
public abstract class basetask { public string taskname;//任务名称 public integer priority; //优先级 public boolean finish; //任务完成? /** * 执行的任务 */ public abstract void dotask(date date) throws exception;
第一个版本的思路很简单。
第二个版本稍微有一点点复杂。这里主要介绍该版本的内容,后续将代码的链接附上。
程序是由springboot搭建起来的,定时器是spring内置的轻量级的quartz,使用aop方式拦截异常,使用注解的方式在任务初始化时设置任务的初始变量。使用eventbus解耦程序,其中程序简单实现邮件发送功能(该功能还需要自己配置参数),以上这些至少需要简单的了解一下。
程序的思路:在整个队列执行过程中会有多个管道,某个队列上的管道任务执行完成,可以直接进行到下一个队列中执行,也设置了等待某一个队列上的所有任务都执行完成才执行当前任务。在某个队列任务中会标识某些任务是一队的,其他的为另一队,当这一队任务执行完成,就可以到下一个队列中去,不需要等待另一队。
这里会先初始化每个队列的每个队的条件,这个条件就是每个队的任务数,执行完成减1,当为0时,就进入下一个队列中。
分四个步骤进行完成:
1.bean的初始化
2.条件的设置
3.任务的执行
4.任务异常和任务执行完成之后通知检查是否执行下一个队列的任务
1.bean的初始化
1.创建注解类
@retention(retentionpolicy.runtime) @target(elementtype.type) @documented public @interface taskannotation { int priority() default 0;//优先级 string taskname() default "";//任务名称 taskqueueenum[] queuename() default {};//队列名称 }
2.实现beanpostprocessor,该接口是中有两个方法postprocessbeforeinitialization和postprocessafterinitialization,分别是bean初始化之前和bean初始化之后做的事情。
annotation[] annotations = bean.getclass().getannotations();//获取类上的注解 if (arrayutils.isempty(annotations)) {//注解为空时直接返回(不能返回空,否则bean不会被加载) return bean; } for (annotation annotation : annotations) { if (annotation.annotationtype().equals(taskannotation.class)) { taskannotation taskannotation = (taskannotation) annotation;//强转 try { field[] fields = target.getclass().getfields();//需要通过反射将值进行修改,下面的操作仅仅是对象的引用 if (!arrayutils.isempty(fields)) { for (field f : fields) { f.setaccessible(true); if (f.getname().equals("priority")) { f.set(target, taskannotation.priority()); } } } } }
上面需要注意的一点是需要通过反射的机制给bean设置值,不能直接调用bean的方式set值,否则bean的值是空的。
上面的代码通过实现beanpostprocessor后置处理器,处理任务上的注解,完成对任务的初始化的。
2.条件的初始化
创建条件类,提供初始化的方法。
public abstract class basetask { public int nextpriority;//子级节点的优先级 public string taskname;//任务名称 public integer priority; //优先级 public string queuename;//队列名称 public boolean finish; //任务完成? public boolean allexecute; /** * 执行的任务 */ public abstract void dotask(date date) throws exception; //任务完成之后,通过eventbus发送通知,是否需要执行下一个队列 public void notifyexecutetaskmsg(eventbus eventbus, date date) { eventnotifyexecutetaskmsg msg = new eventnotifyexecutetaskmsg(); msg.setdate(date); msg.setnextpriority(nextpriority); msg.setqueuename(queuename); msg.setpriority(priority); msg.settaskname(taskname); eventbus.post(msg); } } public class taskexecutecondition { private concurrenthashmap<string, atomicinteger> executemap = new concurrenthashmap<>(); /** * 初始化,每个队列进行分组,每个组的任务数量放入map集合中. */ public void init(concurrenthashmap<integer, list<basetask>> tasks) { enumeration<integer> keys = tasks.keys(); list<integer> prioritys = new arraylist<>(); while (keys.hasmoreelements()) { prioritys.add(keys.nextelement()); } collections.sort(prioritys);//升序 for (integer priority : prioritys) { list<basetask> list = tasks.get(priority); if (list.isempty()) { continue; } //对每个队列进行分组 map<string, list<basetask>> collect = list.stream() .collect(collectors.groupingby(x -> x.queuename, collectors.tolist())); for (entry<string, list<basetask>> entry : collect.entryset()) { for (basetask task : entry.getvalue()) { addcondition(task.priority, task.queuename); } } } } /** * 执行任务完成,条件减1 */ public boolean executetask(integer priority, string queuename) { string name = this.getqueue(priority, queuename); atomicinteger count = executemap.get(name); int sum = count.decrementandget(); if (sum == 0) { return true; } return false; } /** * 对个某个队列的条件 */ public int getcondition(integer priority, string queuename) { string name = this.getqueue(priority, queuename); return executemap.get(name).get(); } private void addcondition(integer priority, string queuename) { string name = this.getqueue(priority, queuename); atomicinteger count = executemap.get(name); if (count == null) { count = new atomicinteger(0); executemap.put(name, count); } count.incrementandget(); } private void addcondition(integer priority, string queuename, int sum) { string name = this.getqueue(priority, queuename); atomicinteger count = executemap.get(name); if (count == null) { count = new atomicinteger(sum); executemap.put(name, count); } else { count.set(sum); } } private string getqueue(integer priority, string queuename) { return priority + queuename; } /** * 清除队列 */ public void clear() { this.executemap.clear(); } }
3.任务的执行
任务执行类提供run方法,执行第一个队列,并提供获取下一个队列优先级方法,执行某个队列某个组的方法。
public class scheduletask { private static final logger logger = loggerfactory.getlogger(scheduletask.class); public concurrenthashmap<integer/* 优先级. */, list<basetask>/* 任务集合. */> tasks = new concurrenthashmap<>(); @autowired private threadpooltaskexecutor executor;//线程池 //任务会先执行第一队列的任务. public void run(date date) { enumeration<integer> keys = tasks.keys(); list<integer> prioritys = new arraylist<>(); while (keys.hasmoreelements()) { prioritys.add(keys.nextelement()); } collections.sort(prioritys);//升序 integer priority = prioritys.get(0); executetask(priority, date);//执行第一行的任务. } //获取下一个队列的优先级 public integer nextpriority(integer priority) { enumeration<integer> keys = tasks.keys(); list<integer> prioritys = new arraylist<>(); while (keys.hasmoreelements()) { prioritys.add(keys.nextelement()); } collections.sort(prioritys);//升序 for (integer pri : prioritys) { if (priority < pri) { return pri; } } return null;//没有下一个队列 } public void executetask(integer priority) { list<basetask> list = tasks.get(priority); if (list.isempty()) { return; } for (basetask task : list) { execute(task); } } //执行某个队列的某个组 public void executetask(integer priority, string queuename) { list<basetask> list = this.tasks.get(priority); list = list.stream().filter(task -> queuename.equals(task.queuename)) .collect(collectors.tolist()); if (list.isempty()) { return; } for (basetask task : list) { execute(task); } } public void execute(basetask task) { executor.execute(() -> { try { task.dotask(date);// } catch (exception e) {//异常处理已经aop拦截处理 } });//线程中执行任务 } /** * 增加任务 */ public void addtask(basetask task) { list<basetask> basetasks = tasks.get(task.priority); if (basetasks == null) { basetasks = new arraylist<>(); list<basetask> putifabsent = tasks.putifabsent(task.priority, basetasks); if (putifabsent != null) { basetasks = putifabsent; } } basetasks.add(task); } /** * 将任务结束标识重新设置 */ public void finishtask() { tasks.foreach((key, value) -> { for (basetask task : value) { task.finish = false; } }); } }
4.任务异常和任务执行完成之后通知检查是否执行下一个队列的任务
public class eventnotifyexecutetasklistener { private static final logger logger = loggerfactory .getlogger(eventnotifyexecutetasklistener.class); @autowired private scheduletask scheduletask; @autowired private taskexecutecondition condition; @subscribe public void executetask(eventnotifyexecutetaskmsg msg) { //当前队列的某组内容是否都执行完成 boolean success = condition.executetask(msg.getpriority(), msg.getqueuename()); if (success) { integer nextpriority = scheduletask.nextpriority(msg.getpriority()); if (nextpriority != null) { scheduletask.executetask(nextpriority, msg.getqueuename(), msg.getdate());//执行下一个队列 } else {//执行完成,重置任务标识 scheduletask.finishtask(); logger.info("coretask end!"); } } } }
整个思路介绍到这里,那么接下来是整个项目中出现的一些问题
1.beanpostprocessor与aop一起使用时,postprocessafterinitialization调用之后获取的bean分为不同的了,一个是jdk原生实体对象,一种是aop注解下的类会被cglib代理,生成带有后缀的对象,如果通过这个对象时反射获取类的注解,字段和方法,就获取不到,在代码中,需要将其转化一下,将cglib代理之后的类转化为不带后缀的对象。
2.postprocessafterinitialization的参数bean不能直接设置值,就是如下:
taskannotation taskannotation = (taskannotation) annotation;//强转 basetask basetask = (basetask) bean;//强转 basetask.priority = taskannotation.priority();
在使用对象时,其中对象的字段时为空的,并需要通过反射的方式去设置字段的值。
上面仅仅只是个人的想法,如果有更好的方式,或者有某些地方可以进行改进的,我们可以共同探讨一下。
链接地址:https://github.com/wangice/task-scheduler
程序中使用了一个公共包:https://github.com/wangice/misc
上一篇: 用批处理列出所有开机启动项的命令