Spring boot定时任务的原理及动态创建详解
v一、前言
定时任务一般是项目中都需要用到的,可以用于定时处理一些特殊的任务。这篇文章主要给大家介绍了关于spring boot定时任务的原理及动态创建的相关内容,下面来一起看看详细的介绍吧
上周工作遇到了一个需求,同步多个省份销号数据,解绑微信粉丝。分省定时将销号数据放到sftp服务器上,我需要开发定时任务去解析文件。因为是多省份,服务器、文件名规则、数据规则都不一定,所以要做成可配置是有一定难度的。数据规则这块必须强烈要求统一,服务器、文件名规则都可以从配置中心去读。每新增一个省份的配置,后台感知到后,动态生成定时任务。
v二、springboot引入定时任务核心配置
@target(elementtype.type) @retention(retentionpolicy.runtime) @import(schedulingconfiguration.class) @documented public @interface enablescheduling { } @configuration @role(beandefinition.role_infrastructure) public class schedulingconfiguration { @bean(name = taskmanagementconfigutils.scheduled_annotation_processor_bean_name) @role(beandefinition.role_infrastructure) public scheduledannotationbeanpostprocessor scheduledannotationprocessor() { return new scheduledannotationbeanpostprocessor(); } }
接下来主要看一下这个核心后置处理器:scheduledannotationbeanpostprocessor 。
@override public object postprocessafterinitialization(object bean, string beanname) { if (bean instanceof aopinfrastructurebean || bean instanceof taskscheduler || bean instanceof scheduledexecutorservice) { // ignore aop infrastructure such as scoped proxies. return bean; } class<?> targetclass = aopproxyutils.ultimatetargetclass(bean); if (!this.nonannotatedclasses.contains(targetclass)) { map<method, set<scheduled>> annotatedmethods = methodintrospector.selectmethods(targetclass, (methodintrospector.metadatalookup<set<scheduled>>) method -> { set<scheduled> scheduledmethods = annotatedelementutils.getmergedrepeatableannotations( method, scheduled.class, schedules.class); return (!scheduledmethods.isempty() ? scheduledmethods : null); }); if (annotatedmethods.isempty()) { this.nonannotatedclasses.add(targetclass); if (logger.istraceenabled()) { logger.trace("no @scheduled annotations found on bean class: " + targetclass); } } else { // non-empty set of methods annotatedmethods.foreach((method, scheduledmethods) -> scheduledmethods.foreach(scheduled -> processscheduled(scheduled, method, bean))); if (logger.istraceenabled()) { logger.trace(annotatedmethods.size() + " @scheduled methods processed on bean '" + beanname + "': " + annotatedmethods); } } } return bean; }
1、处理scheduled注解,通过scheduledtaskregistrar注册定时任务。
private void finishregistration() { if (this.scheduler != null) { this.registrar.setscheduler(this.scheduler); } if (this.beanfactory instanceof listablebeanfactory) { map<string, schedulingconfigurer> beans = ((listablebeanfactory) this.beanfactory).getbeansoftype(schedulingconfigurer.class); list<schedulingconfigurer> configurers = new arraylist<>(beans.values()); annotationawareordercomparator.sort(configurers); for (schedulingconfigurer configurer : configurers) { configurer.configuretasks(this.registrar); } } if (this.registrar.hastasks() && this.registrar.getscheduler() == null) { assert.state(this.beanfactory != null, "beanfactory must be set to find scheduler by type"); try { // search for taskscheduler bean... this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, false)); } catch (nouniquebeandefinitionexception ex) { logger.trace("could not find unique taskscheduler bean", ex); try { this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, true)); } catch (nosuchbeandefinitionexception ex2) { if (logger.isinfoenabled()) { logger.info("more than one taskscheduler bean exists within the context, and " + "none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " + "(possibly as an alias); or implement the schedulingconfigurer interface and call " + "scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " + ex.getbeannamesfound()); } } } catch (nosuchbeandefinitionexception ex) { logger.trace("could not find default taskscheduler bean", ex); // search for scheduledexecutorservice bean next... try { this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, false)); } catch (nouniquebeandefinitionexception ex2) { logger.trace("could not find unique scheduledexecutorservice bean", ex2); try { this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, true)); } catch (nosuchbeandefinitionexception ex3) { if (logger.isinfoenabled()) { logger.info("more than one scheduledexecutorservice bean exists within the context, and " + "none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " + "(possibly as an alias); or implement the schedulingconfigurer interface and call " + "scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " + ex2.getbeannamesfound()); } } } catch (nosuchbeandefinitionexception ex2) { logger.trace("could not find default scheduledexecutorservice bean", ex2); // giving up -> falling back to default scheduler within the registrar... logger.info("no taskscheduler/scheduledexecutorservice bean found for scheduled processing"); } } } this.registrar.afterpropertiesset(); }
1、通过一系列的schedulingconfigurer动态配置scheduledtaskregistrar。
2、向scheduledtaskregistrar注册一个taskscheduler(用于对runnable的任务进行调度,它包含有多种触发规则)。
3、registrar.afterpropertiesset(),在这开始安排所有的定时任务开始执行了。
protected void scheduletasks() { if (this.taskscheduler == null) { this.localexecutor = executors.newsinglethreadscheduledexecutor(); this.taskscheduler = new concurrenttaskscheduler(this.localexecutor); } if (this.triggertasks != null) { for (triggertask task : this.triggertasks) { addscheduledtask(scheduletriggertask(task)); } } if (this.crontasks != null) { for (crontask task : this.crontasks) { addscheduledtask(schedulecrontask(task)); } } if (this.fixedratetasks != null) { for (intervaltask task : this.fixedratetasks) { addscheduledtask(schedulefixedratetask(task)); } } if (this.fixeddelaytasks != null) { for (intervaltask task : this.fixeddelaytasks) { addscheduledtask(schedulefixeddelaytask(task)); } } }
1、triggertask:动态定时任务。通过trigger#nextexecutiontime 给定的触发上下文确定下一个执行时间。
2、crontask:动态定时任务,triggertask子类。通过cron表达式确定的时间触发下一个任务执行。
3、intervaltask:一定时间延迟之后,周期性执行的任务。
4、taskscheduler 如果为空,默认是concurrenttaskscheduler,并使用默认单线程的scheduledexecutor。
v三、主要看一下crontask工作原理
scheduledtaskregistrar.java @nullable public scheduledtask schedulecrontask(crontask task) { scheduledtask scheduledtask = this.unresolvedtasks.remove(task); boolean newtask = false; if (scheduledtask == null) { scheduledtask = new scheduledtask(task); newtask = true; } if (this.taskscheduler != null) { scheduledtask.future = this.taskscheduler.schedule(task.getrunnable(), task.gettrigger()); } else { addcrontask(task); this.unresolvedtasks.put(task, scheduledtask); } return (newtask ? scheduledtask : null); } concurrenttaskscheduler.java @override @nullable public scheduledfuture<?> schedule(runnable task, trigger trigger) { try { if (this.enterpriseconcurrentscheduler) { return new enterpriseconcurrenttriggerscheduler().schedule(decoratetask(task, true), trigger); } else { errorhandler errorhandler = (this.errorhandler != null ? this.errorhandler : taskutils.getdefaulterrorhandler(true)); return new reschedulingrunnable(task, trigger, this.scheduledexecutor, errorhandler).schedule(); } } catch (rejectedexecutionexception ex) { throw new taskrejectedexception("executor [" + this.scheduledexecutor + "] did not accept task: " + task, ex); } } reschedulingrunnable.java @nullable public scheduledfuture<?> schedule() { synchronized (this.triggercontextmonitor) { this.scheduledexecutiontime = this.trigger.nextexecutiontime(this.triggercontext); if (this.scheduledexecutiontime == null) { return null; } long initialdelay = this.scheduledexecutiontime.gettime() - system.currenttimemillis(); this.currentfuture = this.executor.schedule(this, initialdelay, timeunit.milliseconds); return this; } } private scheduledfuture<?> obtaincurrentfuture() { assert.state(this.currentfuture != null, "no scheduled future"); return this.currentfuture; } @override public void run() { date actualexecutiontime = new date(); super.run(); date completiontime = new date(); synchronized (this.triggercontextmonitor) { assert.state(this.scheduledexecutiontime != null, "no scheduled execution"); this.triggercontext.update(this.scheduledexecutiontime, actualexecutiontime, completiontime); if (!obtaincurrentfuture().iscancelled()) { schedule(); } } }
1、最终将task和trigger都封装到了reschedulingrunnable中。
2、reschedulingrunnable实现了任务重复调度(schedule方法中调用调度器executor并传入自身对象,executor会调用run方法,run方法又调用了schedule方法)。
3、reschedulingrunnable schedule方法加了同步锁,只能有一个线程拿到下次执行时间并加入执行器的调度。
4、不同的reschedulingrunnable对象之间在线程池够用的情况下是不会相互影响的,也就是说满足线程池的条件下,taskscheduler的schedule方法的多次调用是可以交叉执行的。
scheduledthreadpoolexecutor.java public scheduledfuture<?> schedule(runnable command, long delay, timeunit unit) { if (command == null || unit == null) throw new nullpointerexception(); runnablescheduledfuture<?> t = decoratetask(command, new scheduledfuturetask<void>(command, null, triggertime(delay, unit))); delayedexecute(t); return t; } private void delayedexecute(runnablescheduledfuture<?> task) { if (isshutdown()) reject(task); else { super.getqueue().add(task); if (isshutdown() && !canrunincurrentrunstate(task.isperiodic()) && remove(task)) task.cancel(false); else ensureprestart(); } }
scheduledfuturetask 工作原理如下图所示【太懒了,不想画图了,盗图一张】。
1、scheduledfuturetask会放入优先阻塞队列:scheduledthreadpoolexecutor.delayedworkqueue(二叉最小堆实现)
2、上图中的thread对象即threadpoolexecutor.worker,实现了runnable接口
/** * creates with given first task and thread from threadfactory. * @param firsttask the first task (null if none) */ worker(runnable firsttask) { setstate(-1); // inhibit interrupts until runworker this.firsttask = firsttask; this.thread = getthreadfactory().newthread(this); } /** delegates main run loop to outer runworker */ public void run() { runworker(this); }
1、worker中维护了thread对象,thread对象的runnable实例即worker自身
2、threadpoolexecutor#addworker方法中会创建worker对象,然后拿到worker中的thread实例并start,这样就创建了线程池中的一个线程实例
3、worker的run方法会调用threadpoolexecutor#runworker方法,这才是任务最终被执行的地方,该方法示意如下
(1)首先取传入的task执行,如果task是null,只要该线程池处于运行状态,就会通过gettask方法从workqueue中取任务。threadpoolexecutor的execute方法会在无法产生core线程的时候向 workqueue队列中offer任务。
gettask方法从队列中取task的时候会根据相关配置决定是否阻塞和阻塞多久。如果gettask方法结束,返回的是null,runworker循环结束,执行processworkerexit方法。
至此,该线程结束自己的使命,从线程池中“消失”。
(2)在开始执行任务之前,会调用worker的lock方法,目的是阻止task正在被执行的时候被interrupt,通过调用clearinterruptsfortaskrun方法来保证的(后面可以看一下这个方法),该线程没有自己的interrupt set了。
(3)beforeexecute和afterexecute方法用于在执行任务前后执行一些自定义的操作,这两个方法是空的,留给继承类去填充功能。
我们可以在beforeexecute方法中抛出异常,这样task不会被执行,而且在跳出该循环的时候completedabruptly的值是true,表示the worker died due to user exception,会用decrementworkercount调整wc。
(4)因为runnable的run方法不能抛出throwables异常,所以这里重新包装异常然后抛出,抛出的异常会使当当前线程死掉,可以在afterexecute中对异常做一些处理。
(5)afterexecute方法也可能抛出异常,也可能使当前线程死掉。
v四、动态创建定时任务
v taskconfiguration 配置类
@configuration @enablescheduling @role(beandefinition.role_infrastructure) public class taskconfiguration { @bean(name = scheduledannotationbeanpostprocessor.default_task_scheduler_bean_name) @role(beandefinition.role_infrastructure) public scheduledexecutorservice scheduledannotationprocessor() { return executors.newscheduledthreadpool(5, new defaultthreadfactory()); } private static class defaultthreadfactory implements threadfactory { private static final atomicinteger poolnumber = new atomicinteger(1); private final threadgroup group; private final atomicinteger threadnumber = new atomicinteger(1); private final string nameprefix; defaultthreadfactory() { securitymanager s = system.getsecuritymanager(); group = (s != null) ? s.getthreadgroup() : thread.currentthread().getthreadgroup(); nameprefix = "pool-" + poolnumber.getandincrement() + "-schedule-"; } @override public thread newthread(runnable r) { thread t = new thread(group, r, nameprefix + threadnumber.getandincrement(), 0); if (t.isdaemon()) { t.setdaemon(false); } if (t.getpriority() != thread.norm_priority) { t.setpriority(thread.norm_priority); } return t; } } }
1、保证concurrenttaskscheduler不使用默认单线程的scheduledexecutor,而是corepoolsize=5的线程池
2、自定义线程池工厂类
v dynamictask 动态定时任务
@configuration public class dynamictask implements schedulingconfigurer { private static logger logger = loggerfactory.getlogger(dynamictask.class); private static final executorservice es = new threadpoolexecutor(10, 20, 0l, timeunit.milliseconds, new linkedblockingqueue<>(10), new dynamictaskconsumethreadfactory()); private volatile scheduledtaskregistrar registrar; private final concurrenthashmap<string, scheduledfuture<?>> scheduledfutures = new concurrenthashmap<>(); private final concurrenthashmap<string, crontask> crontasks = new concurrenthashmap<>(); private volatile list<taskconstant> taskconstants = lists.newarraylist(); @override public void configuretasks(scheduledtaskregistrar registrar) { this.registrar = registrar; this.registrar.addtriggertask(() -> { if (!collectionutils.isempty(taskconstants)) { logger.info("检测动态定时任务列表..."); list<timingtask> tts = new arraylist<>(); taskconstants .foreach(taskconstant -> { timingtask tt = new timingtask(); tt.setexpression(taskconstant.getcron()); tt.settaskid("dynamic-task-" + taskconstant.gettaskid()); tts.add(tt); }); this.refreshtasks(tts); } } , triggercontext -> new periodictrigger(5l, timeunit.seconds).nextexecutiontime(triggercontext)); } public list<taskconstant> gettaskconstants() { return taskconstants; } private void refreshtasks(list<timingtask> tasks) { //取消已经删除的策略任务 set<string> taskids = scheduledfutures.keyset(); for (string taskid : taskids) { if (!exists(tasks, taskid)) { scheduledfutures.get(taskid).cancel(false); } } for (timingtask tt : tasks) { string expression = tt.getexpression(); if (stringutils.isblank(expression) || !cronsequencegenerator.isvalidexpression(expression)) { logger.error("定时任务dynamictask cron表达式不合法: " + expression); continue; } //如果配置一致,则不需要重新创建定时任务 if (scheduledfutures.containskey(tt.gettaskid()) && crontasks.get(tt.gettaskid()).getexpression().equals(expression)) { continue; } //如果策略执行时间发生了变化,则取消当前策略的任务 if (scheduledfutures.containskey(tt.gettaskid())) { scheduledfutures.remove(tt.gettaskid()).cancel(false); crontasks.remove(tt.gettaskid()); } crontask task = new crontask(tt, expression); scheduledfuture<?> future = registrar.getscheduler().schedule(task.getrunnable(), task.gettrigger()); crontasks.put(tt.gettaskid(), task); scheduledfutures.put(tt.gettaskid(), future); } } private boolean exists(list<timingtask> tasks, string taskid) { for (timingtask task : tasks) { if (task.gettaskid().equals(taskid)) { return true; } } return false; } @predestroy public void destroy() { this.registrar.destroy(); } public static class taskconstant { private string cron; private string taskid; public string getcron() { return cron; } public void setcron(string cron) { this.cron = cron; } public string gettaskid() { return taskid; } public void settaskid(string taskid) { this.taskid = taskid; } } private class timingtask implements runnable { private string expression; private string taskid; public string gettaskid() { return taskid; } public void settaskid(string taskid) { this.taskid = taskid; } @override public void run() { //设置队列大小10 logger.error("当前crontask: " + this); dynamicblockingqueue queue = new dynamicblockingqueue(3); es.submit(() -> { while (!queue.isdone() || !queue.isempty()) { try { string content = queue.poll(500, timeunit.milliseconds); if (stringutils.isblank(content)) { return; } logger.info("dynamicblockingqueue 消费:" + content); timeunit.milliseconds.sleep(500); } catch (interruptedexception e) { e.printstacktrace(); } } }); //队列放入数据 for (int i = 0; i < 5; ++i) { try { queue.put(string.valueof(i)); logger.info("dynamicblockingqueue 生产:" + i); } catch (interruptedexception e) { e.printstacktrace(); } } queue.setdone(true); } public string getexpression() { return expression; } public void setexpression(string expression) { this.expression = expression; } @override public string tostring() { return reflectiontostringbuilder.tostring(this , tostringstyle.json_style , false , false , timingtask.class); } } /** * 队列消费线程工厂类 */ private static class dynamictaskconsumethreadfactory implements threadfactory { private static final atomicinteger poolnumber = new atomicinteger(1); private final threadgroup group; private final atomicinteger threadnumber = new atomicinteger(1); private final string nameprefix; dynamictaskconsumethreadfactory() { securitymanager s = system.getsecuritymanager(); group = (s != null) ? s.getthreadgroup() : thread.currentthread().getthreadgroup(); nameprefix = "pool-" + poolnumber.getandincrement() + "-dynamic-task-"; } @override public thread newthread(runnable r) { thread t = new thread(group, r, nameprefix + threadnumber.getandincrement(), 0); if (t.isdaemon()) { t.setdaemon(false); } if (t.getpriority() != thread.norm_priority) { t.setpriority(thread.norm_priority); } return t; } } private static class dynamicblockingqueue extends linkedblockingqueue<string> { dynamicblockingqueue(int capacity) { super(capacity); } private volatile boolean done = false; public boolean isdone() { return done; } public void setdone(boolean done) { this.done = done; } } }
1、taskconstants 动态任务列表
2、scheduledtaskregistrar#addtriggertask 添加动态周期定时任务,检测动态任务列表的变化
crontask task = new crontask(tt, expression); scheduledfuture<?> future = registrar.getscheduler().schedule(task.getrunnable(), task.gettrigger()); crontasks.put(tt.gettaskid(), task); scheduledfutures.put(tt.gettaskid(), future);
3、动态创建cron定时任务,拿到scheduledfuture实例并缓存起来
4、在刷新任务列表时,通过缓存的scheduledfuture实例和crontask实例,来决定是否取消、移除失效的动态定时任务。
v dynamictasktest 动态定时任务测试类
@runwith(springrunner.class) @springboottest public class dynamictasktest { @autowired private dynamictask dynamictask; @test public void test() throws interruptedexception { list<dynamictask.taskconstant> taskconstans = dynamictask.gettaskconstants(); dynamictask.taskconstant taskconstant = new dynamictask.taskconstant(); taskconstant.setcron("0/5 * * * * ?"); taskconstant.settaskid("test1"); taskconstans.add(taskconstant); dynamictask.taskconstant taskconstant1 = new dynamictask.taskconstant(); taskconstant1.setcron("0/5 * * * * ?"); taskconstant1.settaskid("test2"); taskconstans.add(taskconstant1); dynamictask.taskconstant taskconstant2 = new dynamictask.taskconstant(); taskconstant2.setcron("0/5 * * * * ?"); taskconstant2.settaskid("test3"); taskconstans.add(taskconstant2); timeunit.seconds.sleep(40); //移除并添加新的配置 taskconstans.remove(taskconstans.size() - 1); dynamictask.taskconstant taskconstant3 = new dynamictask.taskconstant(); taskconstant3.setcron("0/5 * * * * ?"); taskconstant3.settaskid("test4"); taskconstans.add(taskconstant3); // timeunit.minutes.sleep(50); } }
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。
上一篇: 文件缓存(配合JSON数组)
推荐阅读
-
Spring boot定时任务的原理及动态创建详解
-
详解Spring Boot 中实现定时任务的两种方式
-
spring boot task实现动态创建定时任务的方法
-
详解Spring Boot 定时任务的实现方法
-
详解Spring Boot 定时任务的实现方法
-
Spring boot定时任务的原理及动态创建详解
-
spring-boot通过@Scheduled配置定时任务及定时任务@Scheduled注解的方法
-
详解Spring Boot 中实现定时任务的两种方式
-
java定时任务_Spring Boot 整合 Quartz 实现 Java 定时任务的动态配置
-
Spring Boot 创建定时任务(配合数据库动态执行)