欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spring的cron定时任务解析

程序员文章站 2022-07-12 20:07:55
...
在用spring定时任务时,配置如下:
<task:executor id="executor" pool-size="5" />
	<task:scheduler id="scheduler" pool-size="10" />
	<task:annotation-driven executor="executor" scheduler="scheduler" />


配置项schema为spring-task-3.0.xsd。
对某方法定义cron如下:
@Scheduled(cron = "0 */5 * * * ?")
	pulic void run(){
		//other codes…
}

因run方法是每5分钟跑一次,而run方法中具有复杂的业务逻辑(大概包括调用第三方httpservice,导数据到内存计算,再分次批量写库)。Task可能会运行超过5分钟,此种情况下,预期目标是和linux cron一样,定点执行task,而不用理会前一个task是否已完成。

运行一段时间后,发现少了几个点的数据,然后针对数据查看了相应的日志,发现每次都只会有一个shedule线程跑同一个run方法,而这个run方法处于wait状态或5分钟内没跑完,会导致后续的task不能按时启动。

查看spring源码,shedule流程初始化如下:
1.在bean初始化后,通过反射机制,对实例化的bean的class进行注解扫描,判断bean的class定义中@Scheduled注解。若有则判断调度模式,如本题中为cron调度模式。此实现代码位于:
ScheduledAnnotationBeanPostProcessor.postProcessAfterInitialization
在扫描到@Scheduled的方法后,把每一个目标method封装为Runnable,代码如下:
MethodInvokingRunnable runnable = new MethodInvokingRunnable();
					runnable.setTargetObject(bean);
					runnable.setTargetMethod(method.getName());
					runnable.setArguments(new Object[0]);
					try {
						runnable.prepare();
					}
					catch (Exception ex) {
						throw new IllegalStateException("failed to prepare task", ex);
					}


此postProcessAfterInitialization是维护crontasks,fixedRateTasks容器,针对每一个目标method生成相应的MethodInvokingRunnable对实例。

2.针对cron任务,对每一个MethodInvokingRunnable注册相应的CronTrigger,并通过ConcurrentTaskSchedule.shedule创建ReschedulingRunnable执行器并解析cron表达式添加第1个task。
代码实现在ScheduledTaskRegistrar. afterPropertiesSet。

3. ReschedulingRunnable自身 串联schdule。如下代码所示:
public ScheduledFuture schedule() {
		synchronized (this.triggerContextMonitor) {
			this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
			if (this.scheduledExecutionTime == null) {
				return null;
			}
			long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
			this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
			return this;
		}
	}

	@Override
	public void run() {
		Date actualExecutionTime = new Date();
		super.run();
		Date completionTime = new Date();
		synchronized (this.triggerContextMonitor) {
			this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
		}
		if (!this.currentFuture.isCancelled()) {
			schedule();
		}
	}


其中trigger.nextExecutionTime即获取下一个task的执行时间,并提交到executor执行器。

而见run中,是执行了当前的task后再根据corn表达式获取下一个时间点。如此若当前task超过5分钟则下一个5分钟时间点的任务不会被执行。而想要达到linux cron形式的task调度,则需要在此基础之上进行简单处理。
处理办法:
额外创建一个线程池executor如名称为A,在cron方法中提交runnable给这个A执行。
这样的情况可以满足和linux cron一样的定时需求,但得对task有独立性要求。若前后有存在依赖,或者可能导致数据一致性问题还是得慎重。

ps:依据spring中根据bean来解析相应class的schedule注解,如果相应class的实例为多个,就会导致同时有多个相同的task执行。