欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spring boot定时任务的原理及动态创建详解

程序员文章站 2024-02-28 08:59:22
v一、前言 定时任务一般是项目中都需要用到的,可以用于定时处理一些特殊的任务。这篇文章主要给大家介绍了关于spring boot定时任务的原理及动态创建的相关内容,下...

v一、前言

定时任务一般是项目中都需要用到的,可以用于定时处理一些特殊的任务。这篇文章主要给大家介绍了关于spring boot定时任务的原理及动态创建的相关内容,下面来一起看看详细的介绍吧

上周工作遇到了一个需求,同步多个省份销号数据,解绑微信粉丝。分省定时将销号数据放到sftp服务器上,我需要开发定时任务去解析文件。因为是多省份,服务器、文件名规则、数据规则都不一定,所以要做成可配置是有一定难度的。数据规则这块必须强烈要求统一,服务器、文件名规则都可以从配置中心去读。每新增一个省份的配置,后台感知到后,动态生成定时任务。

v二、springboot引入定时任务核心配置

@target(elementtype.type)
@retention(retentionpolicy.runtime)
@import(schedulingconfiguration.class)
@documented
public @interface enablescheduling {

}

@configuration
@role(beandefinition.role_infrastructure)
public class schedulingconfiguration {

 @bean(name = taskmanagementconfigutils.scheduled_annotation_processor_bean_name)
 @role(beandefinition.role_infrastructure)
 public scheduledannotationbeanpostprocessor scheduledannotationprocessor() {
 return new scheduledannotationbeanpostprocessor();
 }

}

接下来主要看一下这个核心后置处理器:scheduledannotationbeanpostprocessor 。

@override
public object postprocessafterinitialization(object bean, string beanname) {
 if (bean instanceof aopinfrastructurebean || bean instanceof taskscheduler ||
  bean instanceof scheduledexecutorservice) {
 // ignore aop infrastructure such as scoped proxies.
 return bean;
 }

 class<?> targetclass = aopproxyutils.ultimatetargetclass(bean);
 if (!this.nonannotatedclasses.contains(targetclass)) {
 map<method, set<scheduled>> annotatedmethods = methodintrospector.selectmethods(targetclass,
  (methodintrospector.metadatalookup<set<scheduled>>) method -> {
   set<scheduled> scheduledmethods = annotatedelementutils.getmergedrepeatableannotations(
    method, scheduled.class, schedules.class);
   return (!scheduledmethods.isempty() ? scheduledmethods : null);
  });
 if (annotatedmethods.isempty()) {
  this.nonannotatedclasses.add(targetclass);
  if (logger.istraceenabled()) {
  logger.trace("no @scheduled annotations found on bean class: " + targetclass);
  }
 }
 else {
  // non-empty set of methods
  annotatedmethods.foreach((method, scheduledmethods) ->
   scheduledmethods.foreach(scheduled -> processscheduled(scheduled, method, bean)));
  if (logger.istraceenabled()) {
  logger.trace(annotatedmethods.size() + " @scheduled methods processed on bean '" + beanname +
   "': " + annotatedmethods);
  }
 }
 }
 return bean;
}

1、处理scheduled注解,通过scheduledtaskregistrar注册定时任务。

private void finishregistration() {
 if (this.scheduler != null) {
 this.registrar.setscheduler(this.scheduler);
 }

 if (this.beanfactory instanceof listablebeanfactory) {
 map<string, schedulingconfigurer> beans =
  ((listablebeanfactory) this.beanfactory).getbeansoftype(schedulingconfigurer.class);
 list<schedulingconfigurer> configurers = new arraylist<>(beans.values());
 annotationawareordercomparator.sort(configurers);
 for (schedulingconfigurer configurer : configurers) {
  configurer.configuretasks(this.registrar);
 }
 }

 if (this.registrar.hastasks() && this.registrar.getscheduler() == null) {
 assert.state(this.beanfactory != null, "beanfactory must be set to find scheduler by type");
 try {
  // search for taskscheduler bean...
  this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, false));
 }
 catch (nouniquebeandefinitionexception ex) {
  logger.trace("could not find unique taskscheduler bean", ex);
  try {
  this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, true));
  }
  catch (nosuchbeandefinitionexception ex2) {
  if (logger.isinfoenabled()) {
   logger.info("more than one taskscheduler bean exists within the context, and " +
    "none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " +
    "(possibly as an alias); or implement the schedulingconfigurer interface and call " +
    "scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " +
    ex.getbeannamesfound());
  }
  }
 }
 catch (nosuchbeandefinitionexception ex) {
  logger.trace("could not find default taskscheduler bean", ex);
  // search for scheduledexecutorservice bean next...
  try {
  this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, false));
  }
  catch (nouniquebeandefinitionexception ex2) {
  logger.trace("could not find unique scheduledexecutorservice bean", ex2);
  try {
   this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, true));
  }
  catch (nosuchbeandefinitionexception ex3) {
   if (logger.isinfoenabled()) {
   logger.info("more than one scheduledexecutorservice bean exists within the context, and " +
    "none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " +
    "(possibly as an alias); or implement the schedulingconfigurer interface and call " +
    "scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " +
    ex2.getbeannamesfound());
   }
  }
  }
  catch (nosuchbeandefinitionexception ex2) {
  logger.trace("could not find default scheduledexecutorservice bean", ex2);
  // giving up -> falling back to default scheduler within the registrar...
  logger.info("no taskscheduler/scheduledexecutorservice bean found for scheduled processing");
  }
 }
 }

 this.registrar.afterpropertiesset();
}

  1、通过一系列的schedulingconfigurer动态配置scheduledtaskregistrar。

  2、向scheduledtaskregistrar注册一个taskscheduler(用于对runnable的任务进行调度,它包含有多种触发规则)。

  3、registrar.afterpropertiesset(),在这开始安排所有的定时任务开始执行了。

protected void scheduletasks() {
 if (this.taskscheduler == null) {
 this.localexecutor = executors.newsinglethreadscheduledexecutor();
 this.taskscheduler = new concurrenttaskscheduler(this.localexecutor);
 }
 if (this.triggertasks != null) {
 for (triggertask task : this.triggertasks) {
  addscheduledtask(scheduletriggertask(task));
 }
 }
 if (this.crontasks != null) {
 for (crontask task : this.crontasks) {
  addscheduledtask(schedulecrontask(task));
 }
 }
 if (this.fixedratetasks != null) {
 for (intervaltask task : this.fixedratetasks) {
  addscheduledtask(schedulefixedratetask(task));
 }
 }
 if (this.fixeddelaytasks != null) {
 for (intervaltask task : this.fixeddelaytasks) {
  addscheduledtask(schedulefixeddelaytask(task));
 }
 }
}

  1、triggertask:动态定时任务。通过trigger#nextexecutiontime 给定的触发上下文确定下一个执行时间。

  2、crontask:动态定时任务,triggertask子类。通过cron表达式确定的时间触发下一个任务执行。

  3、intervaltask:一定时间延迟之后,周期性执行的任务。

  4、taskscheduler 如果为空,默认是concurrenttaskscheduler,并使用默认单线程的scheduledexecutor。

v三、主要看一下crontask工作原理

scheduledtaskregistrar.java
@nullable
public scheduledtask schedulecrontask(crontask task) {
 scheduledtask scheduledtask = this.unresolvedtasks.remove(task);
 boolean newtask = false;
 if (scheduledtask == null) {
 scheduledtask = new scheduledtask(task);
 newtask = true;
 }
 if (this.taskscheduler != null) {
 scheduledtask.future = this.taskscheduler.schedule(task.getrunnable(), task.gettrigger());
 }
 else {
 addcrontask(task);
 this.unresolvedtasks.put(task, scheduledtask);
 }
 return (newtask ? scheduledtask : null);
}

concurrenttaskscheduler.java
@override
@nullable
public scheduledfuture<?> schedule(runnable task, trigger trigger) {
 try {
 if (this.enterpriseconcurrentscheduler) {
  return new enterpriseconcurrenttriggerscheduler().schedule(decoratetask(task, true), trigger);
 }
 else {
  errorhandler errorhandler =
   (this.errorhandler != null ? this.errorhandler : taskutils.getdefaulterrorhandler(true));
  return new reschedulingrunnable(task, trigger, this.scheduledexecutor, errorhandler).schedule();
 }
 }
 catch (rejectedexecutionexception ex) {
 throw new taskrejectedexception("executor [" + this.scheduledexecutor + "] did not accept task: " + task, ex);
 }
}

reschedulingrunnable.java
@nullable
public scheduledfuture<?> schedule() {
 synchronized (this.triggercontextmonitor) {
 this.scheduledexecutiontime = this.trigger.nextexecutiontime(this.triggercontext);
 if (this.scheduledexecutiontime == null) {
  return null;
 }
 long initialdelay = this.scheduledexecutiontime.gettime() - system.currenttimemillis();
 this.currentfuture = this.executor.schedule(this, initialdelay, timeunit.milliseconds);
 return this;
 }
}

private scheduledfuture<?> obtaincurrentfuture() {
 assert.state(this.currentfuture != null, "no scheduled future");
 return this.currentfuture;
}

@override
public void run() {
 date actualexecutiontime = new date();
 super.run();
 date completiontime = new date();
 synchronized (this.triggercontextmonitor) {
 assert.state(this.scheduledexecutiontime != null, "no scheduled execution");
 this.triggercontext.update(this.scheduledexecutiontime, actualexecutiontime, completiontime);
 if (!obtaincurrentfuture().iscancelled()) {
  schedule();
 }
 }
}

  1、最终将task和trigger都封装到了reschedulingrunnable中。

  2、reschedulingrunnable实现了任务重复调度(schedule方法中调用调度器executor并传入自身对象,executor会调用run方法,run方法又调用了schedule方法)。

  3、reschedulingrunnable schedule方法加了同步锁,只能有一个线程拿到下次执行时间并加入执行器的调度。

  4、不同的reschedulingrunnable对象之间在线程池够用的情况下是不会相互影响的,也就是说满足线程池的条件下,taskscheduler的schedule方法的多次调用是可以交叉执行的。

scheduledthreadpoolexecutor.java
public scheduledfuture<?> schedule(runnable command,
     long delay,
     timeunit unit) {
 if (command == null || unit == null)
 throw new nullpointerexception();
 runnablescheduledfuture<?> t = decoratetask(command,
 new scheduledfuturetask<void>(command, null,
     triggertime(delay, unit)));
 delayedexecute(t);
 return t;
}


private void delayedexecute(runnablescheduledfuture<?> task) {
 if (isshutdown())
 reject(task);
 else {
 super.getqueue().add(task);
 if (isshutdown() &&
  !canrunincurrentrunstate(task.isperiodic()) &&
  remove(task))
  task.cancel(false);
 else
  ensureprestart();
 }
}

  scheduledfuturetask 工作原理如下图所示【太懒了,不想画图了,盗图一张】。

Spring boot定时任务的原理及动态创建详解 

  1、scheduledfuturetask会放入优先阻塞队列:scheduledthreadpoolexecutor.delayedworkqueue(二叉最小堆实现)

  2、上图中的thread对象即threadpoolexecutor.worker,实现了runnable接口

/**
 * creates with given first task and thread from threadfactory.
 * @param firsttask the first task (null if none)
 */
worker(runnable firsttask) {
 setstate(-1); // inhibit interrupts until runworker
 this.firsttask = firsttask;
 this.thread = getthreadfactory().newthread(this);
}

/** delegates main run loop to outer runworker */
public void run() {
 runworker(this);
}

  1、worker中维护了thread对象,thread对象的runnable实例即worker自身

  2、threadpoolexecutor#addworker方法中会创建worker对象,然后拿到worker中的thread实例并start,这样就创建了线程池中的一个线程实例

  3、worker的run方法会调用threadpoolexecutor#runworker方法,这才是任务最终被执行的地方,该方法示意如下

  (1)首先取传入的task执行,如果task是null,只要该线程池处于运行状态,就会通过gettask方法从workqueue中取任务。threadpoolexecutor的execute方法会在无法产生core线程的时候向  workqueue队列中offer任务。
gettask方法从队列中取task的时候会根据相关配置决定是否阻塞和阻塞多久。如果gettask方法结束,返回的是null,runworker循环结束,执行processworkerexit方法。
至此,该线程结束自己的使命,从线程池中“消失”。

  (2)在开始执行任务之前,会调用worker的lock方法,目的是阻止task正在被执行的时候被interrupt,通过调用clearinterruptsfortaskrun方法来保证的(后面可以看一下这个方法),该线程没有自己的interrupt set了。

  (3)beforeexecute和afterexecute方法用于在执行任务前后执行一些自定义的操作,这两个方法是空的,留给继承类去填充功能。

我们可以在beforeexecute方法中抛出异常,这样task不会被执行,而且在跳出该循环的时候completedabruptly的值是true,表示the worker died due to user exception,会用decrementworkercount调整wc。

  (4)因为runnable的run方法不能抛出throwables异常,所以这里重新包装异常然后抛出,抛出的异常会使当当前线程死掉,可以在afterexecute中对异常做一些处理。

  (5)afterexecute方法也可能抛出异常,也可能使当前线程死掉。

v四、动态创建定时任务

v  taskconfiguration 配置类

@configuration
@enablescheduling
@role(beandefinition.role_infrastructure)
public class taskconfiguration {

 @bean(name = scheduledannotationbeanpostprocessor.default_task_scheduler_bean_name)
 @role(beandefinition.role_infrastructure)
 public scheduledexecutorservice scheduledannotationprocessor() {
 return executors.newscheduledthreadpool(5, new defaultthreadfactory());
 }

 private static class defaultthreadfactory implements threadfactory {
 private static final atomicinteger poolnumber = new atomicinteger(1);
 private final threadgroup group;
 private final atomicinteger threadnumber = new atomicinteger(1);
 private final string nameprefix;

 defaultthreadfactory() {
  securitymanager s = system.getsecuritymanager();
  group = (s != null) ? s.getthreadgroup() :
   thread.currentthread().getthreadgroup();
  nameprefix = "pool-" +
   poolnumber.getandincrement() +
   "-schedule-";
 }

 @override
 public thread newthread(runnable r) {
  thread t = new thread(group, r,
   nameprefix + threadnumber.getandincrement(),
   0);
  if (t.isdaemon()) {
  t.setdaemon(false);
  }
  if (t.getpriority() != thread.norm_priority) {
  t.setpriority(thread.norm_priority);
  }
  return t;
 }
 }
}

  1、保证concurrenttaskscheduler不使用默认单线程的scheduledexecutor,而是corepoolsize=5的线程池

  2、自定义线程池工厂类

v  dynamictask 动态定时任务

@configuration
public class dynamictask implements schedulingconfigurer {
 private static logger logger = loggerfactory.getlogger(dynamictask.class);

 private static final executorservice es = new threadpoolexecutor(10, 20,
   0l, timeunit.milliseconds,
   new linkedblockingqueue<>(10),
   new dynamictaskconsumethreadfactory());


 private volatile scheduledtaskregistrar registrar;
 private final concurrenthashmap<string, scheduledfuture<?>> scheduledfutures = new concurrenthashmap<>();
 private final concurrenthashmap<string, crontask> crontasks = new concurrenthashmap<>();

 private volatile list<taskconstant> taskconstants = lists.newarraylist();

 @override
 public void configuretasks(scheduledtaskregistrar registrar) {
  this.registrar = registrar;
  this.registrar.addtriggertask(() -> {
     if (!collectionutils.isempty(taskconstants)) {
      logger.info("检测动态定时任务列表...");
      list<timingtask> tts = new arraylist<>();
      taskconstants
        .foreach(taskconstant -> {
         timingtask tt = new timingtask();
         tt.setexpression(taskconstant.getcron());
         tt.settaskid("dynamic-task-" + taskconstant.gettaskid());
         tts.add(tt);
        });
      this.refreshtasks(tts);
     }
    }
    , triggercontext -> new periodictrigger(5l, timeunit.seconds).nextexecutiontime(triggercontext));
 }


 public list<taskconstant> gettaskconstants() {
  return taskconstants;
 }

 private void refreshtasks(list<timingtask> tasks) {
  //取消已经删除的策略任务
  set<string> taskids = scheduledfutures.keyset();
  for (string taskid : taskids) {
   if (!exists(tasks, taskid)) {
    scheduledfutures.get(taskid).cancel(false);
   }
  }
  for (timingtask tt : tasks) {
   string expression = tt.getexpression();
   if (stringutils.isblank(expression) || !cronsequencegenerator.isvalidexpression(expression)) {
    logger.error("定时任务dynamictask cron表达式不合法: " + expression);
    continue;
   }
   //如果配置一致,则不需要重新创建定时任务
   if (scheduledfutures.containskey(tt.gettaskid())
     && crontasks.get(tt.gettaskid()).getexpression().equals(expression)) {
    continue;
   }
   //如果策略执行时间发生了变化,则取消当前策略的任务
   if (scheduledfutures.containskey(tt.gettaskid())) {
    scheduledfutures.remove(tt.gettaskid()).cancel(false);
    crontasks.remove(tt.gettaskid());
   }
   crontask task = new crontask(tt, expression);
   scheduledfuture<?> future = registrar.getscheduler().schedule(task.getrunnable(), task.gettrigger());
   crontasks.put(tt.gettaskid(), task);
   scheduledfutures.put(tt.gettaskid(), future);
  }
 }

 private boolean exists(list<timingtask> tasks, string taskid) {
  for (timingtask task : tasks) {
   if (task.gettaskid().equals(taskid)) {
    return true;
   }
  }
  return false;
 }

 @predestroy
 public void destroy() {
  this.registrar.destroy();
 }

 public static class taskconstant {
  private string cron;
  private string taskid;

  public string getcron() {
   return cron;
  }

  public void setcron(string cron) {
   this.cron = cron;
  }

  public string gettaskid() {
   return taskid;
  }

  public void settaskid(string taskid) {
   this.taskid = taskid;
  }
 }

 private class timingtask implements runnable {
  private string expression;

  private string taskid;

  public string gettaskid() {
   return taskid;
  }

  public void settaskid(string taskid) {
   this.taskid = taskid;
  }

  @override
  public void run() {
   //设置队列大小10
   logger.error("当前crontask: " + this);
   dynamicblockingqueue queue = new dynamicblockingqueue(3);
   es.submit(() -> {
    while (!queue.isdone() || !queue.isempty()) {
     try {
      string content = queue.poll(500, timeunit.milliseconds);
      if (stringutils.isblank(content)) {
       return;
      }
      logger.info("dynamicblockingqueue 消费:" + content);
      timeunit.milliseconds.sleep(500);
     } catch (interruptedexception e) {
      e.printstacktrace();
     }
    }
   });

   //队列放入数据
   for (int i = 0; i < 5; ++i) {
    try {
     queue.put(string.valueof(i));
     logger.info("dynamicblockingqueue 生产:" + i);
    } catch (interruptedexception e) {
     e.printstacktrace();
    }
   }
   queue.setdone(true);
  }

  public string getexpression() {
   return expression;
  }

  public void setexpression(string expression) {
   this.expression = expression;
  }

  @override
  public string tostring() {
   return reflectiontostringbuilder.tostring(this
     , tostringstyle.json_style
     , false
     , false
     , timingtask.class);
  }

 }

 /**
  * 队列消费线程工厂类
  */
 private static class dynamictaskconsumethreadfactory implements threadfactory {
  private static final atomicinteger poolnumber = new atomicinteger(1);
  private final threadgroup group;
  private final atomicinteger threadnumber = new atomicinteger(1);
  private final string nameprefix;

  dynamictaskconsumethreadfactory() {
   securitymanager s = system.getsecuritymanager();
   group = (s != null) ? s.getthreadgroup() :
     thread.currentthread().getthreadgroup();
   nameprefix = "pool-" +
     poolnumber.getandincrement() +
     "-dynamic-task-";
  }

  @override
  public thread newthread(runnable r) {
   thread t = new thread(group, r,
     nameprefix + threadnumber.getandincrement(),
     0);
   if (t.isdaemon()) {
    t.setdaemon(false);
   }
   if (t.getpriority() != thread.norm_priority) {
    t.setpriority(thread.norm_priority);
   }
   return t;
  }
 }

 private static class dynamicblockingqueue extends linkedblockingqueue<string> {
  dynamicblockingqueue(int capacity) {
   super(capacity);
  }


  private volatile boolean done = false;

  public boolean isdone() {
   return done;
  }

  public void setdone(boolean done) {
   this.done = done;
  }
 }
}

  1、taskconstants 动态任务列表

  2、scheduledtaskregistrar#addtriggertask 添加动态周期定时任务,检测动态任务列表的变化

crontask task = new crontask(tt, expression);
scheduledfuture<?> future = registrar.getscheduler().schedule(task.getrunnable(), task.gettrigger());
crontasks.put(tt.gettaskid(), task);
scheduledfutures.put(tt.gettaskid(), future);

  3、动态创建cron定时任务,拿到scheduledfuture实例并缓存起来

  4、在刷新任务列表时,通过缓存的scheduledfuture实例和crontask实例,来决定是否取消、移除失效的动态定时任务。

v  dynamictasktest 动态定时任务测试类

@runwith(springrunner.class)
@springboottest
public class dynamictasktest {

 @autowired
 private dynamictask dynamictask;

 @test
 public void test() throws interruptedexception {
  list<dynamictask.taskconstant> taskconstans = dynamictask.gettaskconstants();
  dynamictask.taskconstant taskconstant = new dynamictask.taskconstant();
  taskconstant.setcron("0/5 * * * * ?");
  taskconstant.settaskid("test1");
  taskconstans.add(taskconstant);


  dynamictask.taskconstant taskconstant1 = new dynamictask.taskconstant();
  taskconstant1.setcron("0/5 * * * * ?");
  taskconstant1.settaskid("test2");
  taskconstans.add(taskconstant1);

  dynamictask.taskconstant taskconstant2 = new dynamictask.taskconstant();
  taskconstant2.setcron("0/5 * * * * ?");
  taskconstant2.settaskid("test3");
  taskconstans.add(taskconstant2);

  timeunit.seconds.sleep(40);
  //移除并添加新的配置
  taskconstans.remove(taskconstans.size() - 1);
  dynamictask.taskconstant taskconstant3 = new dynamictask.taskconstant();
  taskconstant3.setcron("0/5 * * * * ?");
  taskconstant3.settaskid("test4");
  taskconstans.add(taskconstant3);
//
  timeunit.minutes.sleep(50);
 }
}

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。