spring的cron定时任务解析
程序员文章站
2022-07-12 20:07:55
...
在用spring定时任务时,配置如下:
配置项schema为spring-task-3.0.xsd。
对某方法定义cron如下:
因run方法是每5分钟跑一次,而run方法中具有复杂的业务逻辑(大概包括调用第三方httpservice,导数据到内存计算,再分次批量写库)。Task可能会运行超过5分钟,此种情况下,预期目标是和linux cron一样,定点执行task,而不用理会前一个task是否已完成。
运行一段时间后,发现少了几个点的数据,然后针对数据查看了相应的日志,发现每次都只会有一个shedule线程跑同一个run方法,而这个run方法处于wait状态或5分钟内没跑完,会导致后续的task不能按时启动。
查看spring源码,shedule流程初始化如下:
1.在bean初始化后,通过反射机制,对实例化的bean的class进行注解扫描,判断bean的class定义中@Scheduled注解。若有则判断调度模式,如本题中为cron调度模式。此实现代码位于:
ScheduledAnnotationBeanPostProcessor.postProcessAfterInitialization
在扫描到@Scheduled的方法后,把每一个目标method封装为Runnable,代码如下:
此postProcessAfterInitialization是维护crontasks,fixedRateTasks容器,针对每一个目标method生成相应的MethodInvokingRunnable对实例。
2.针对cron任务,对每一个MethodInvokingRunnable注册相应的CronTrigger,并通过ConcurrentTaskSchedule.shedule创建ReschedulingRunnable执行器并解析cron表达式添加第1个task。
代码实现在ScheduledTaskRegistrar. afterPropertiesSet。
3. ReschedulingRunnable自身 串联schdule。如下代码所示:
其中trigger.nextExecutionTime即获取下一个task的执行时间,并提交到executor执行器。
而见run中,是执行了当前的task后再根据corn表达式获取下一个时间点。如此若当前task超过5分钟则下一个5分钟时间点的任务不会被执行。而想要达到linux cron形式的task调度,则需要在此基础之上进行简单处理。
处理办法:
额外创建一个线程池executor如名称为A,在cron方法中提交runnable给这个A执行。
这样的情况可以满足和linux cron一样的定时需求,但得对task有独立性要求。若前后有存在依赖,或者可能导致数据一致性问题还是得慎重。
ps:依据spring中根据bean来解析相应class的schedule注解,如果相应class的实例为多个,就会导致同时有多个相同的task执行。
<task:executor id="executor" pool-size="5" /> <task:scheduler id="scheduler" pool-size="10" /> <task:annotation-driven executor="executor" scheduler="scheduler" />
配置项schema为spring-task-3.0.xsd。
对某方法定义cron如下:
@Scheduled(cron = "0 */5 * * * ?") pulic void run(){ //other codes… }
因run方法是每5分钟跑一次,而run方法中具有复杂的业务逻辑(大概包括调用第三方httpservice,导数据到内存计算,再分次批量写库)。Task可能会运行超过5分钟,此种情况下,预期目标是和linux cron一样,定点执行task,而不用理会前一个task是否已完成。
运行一段时间后,发现少了几个点的数据,然后针对数据查看了相应的日志,发现每次都只会有一个shedule线程跑同一个run方法,而这个run方法处于wait状态或5分钟内没跑完,会导致后续的task不能按时启动。
查看spring源码,shedule流程初始化如下:
1.在bean初始化后,通过反射机制,对实例化的bean的class进行注解扫描,判断bean的class定义中@Scheduled注解。若有则判断调度模式,如本题中为cron调度模式。此实现代码位于:
ScheduledAnnotationBeanPostProcessor.postProcessAfterInitialization
在扫描到@Scheduled的方法后,把每一个目标method封装为Runnable,代码如下:
MethodInvokingRunnable runnable = new MethodInvokingRunnable(); runnable.setTargetObject(bean); runnable.setTargetMethod(method.getName()); runnable.setArguments(new Object[0]); try { runnable.prepare(); } catch (Exception ex) { throw new IllegalStateException("failed to prepare task", ex); }
此postProcessAfterInitialization是维护crontasks,fixedRateTasks容器,针对每一个目标method生成相应的MethodInvokingRunnable对实例。
2.针对cron任务,对每一个MethodInvokingRunnable注册相应的CronTrigger,并通过ConcurrentTaskSchedule.shedule创建ReschedulingRunnable执行器并解析cron表达式添加第1个task。
代码实现在ScheduledTaskRegistrar. afterPropertiesSet。
3. ReschedulingRunnable自身 串联schdule。如下代码所示:
public ScheduledFuture schedule() { synchronized (this.triggerContextMonitor) { this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext); if (this.scheduledExecutionTime == null) { return null; } long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis(); this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS); return this; } } @Override public void run() { Date actualExecutionTime = new Date(); super.run(); Date completionTime = new Date(); synchronized (this.triggerContextMonitor) { this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime); } if (!this.currentFuture.isCancelled()) { schedule(); } }
其中trigger.nextExecutionTime即获取下一个task的执行时间,并提交到executor执行器。
而见run中,是执行了当前的task后再根据corn表达式获取下一个时间点。如此若当前task超过5分钟则下一个5分钟时间点的任务不会被执行。而想要达到linux cron形式的task调度,则需要在此基础之上进行简单处理。
处理办法:
额外创建一个线程池executor如名称为A,在cron方法中提交runnable给这个A执行。
这样的情况可以满足和linux cron一样的定时需求,但得对task有独立性要求。若前后有存在依赖,或者可能导致数据一致性问题还是得慎重。
ps:依据spring中根据bean来解析相应class的schedule注解,如果相应class的实例为多个,就会导致同时有多个相同的task执行。