欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java实现终止线程池中正在运行的定时任务

程序员文章站 2024-03-31 10:24:46
最近项目中遇到了一个新的需求,就是实现一个可以动态添加定时任务的功能。说到这里,有人可能会说简单啊,使用quartz就好了,简单粗暴。然而quartz框架太重了,小项目根本...

最近项目中遇到了一个新的需求,就是实现一个可以动态添加定时任务的功能。说到这里,有人可能会说简单啊,使用quartz就好了,简单粗暴。然而quartz框架太重了,小项目根本不好操作啊。当然,也有人会说,jdk提供了timer的接口啊,完全够用啊。但是我们项目的需求完全是多线程的模型啊,而timer是单线程的,so,楼主最后还是选择了jdk的线程池。

线程池是什么

java通过executors提供四种线程池,分别为:
newcachedthreadpool :创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newfixedthreadpool : 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newscheduledthreadpool : 创建一个定长线程池,支持定时及周期性任务执行。
newsinglethreadexecutor : 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(fifo, lifo, 优先级)执行。

楼主项目中用到的是newscheduledthreadpool, 就这些吧,再多的楼主就班门弄斧了,google一下,一大堆。

线程池service的获取

楼主通过单例模式来获取线程池的service,代码如下:

/**
 * 线程池创建.
 * @author wuhf
 * @date 2018/01/16
 */
public class threadpoolutils {

  private static scheduledexecutorservice executorservice;

  private threadpoolutils() {
    //手动创建线程池.
    executorservice = new scheduledthreadpoolexecutor(10,
        new basicthreadfactory.builder().namingpattern("syncdata-schedule-pool-%d").daemon(true).build());
  }

  private static class pluginconfigholder {
    private final static threadpoolutils instance = new threadpoolutils();
  }

  public static threadpoolutils getinstance() {
    return pluginconfigholder.instance;
  }

  public scheduledexecutorservice getthreadpool(){
    return executorservice;
  }

}

中断某一个正在运行的线程代码实现

废话就不多说了,代码如下:

/**
 * 中断线程池的某个任务.
 */
public class interruptthread implements runnable {

  private int num;

  public interruptthread (int num){
    this.num = num;
  }

  public static void main(string[] args) throws interruptedexception {

    thread interruptthread = new thread(new interruptthread(1));
    scheduledfuture<?> t = threadpoolutils.getinstance().getthreadpool().scheduleatfixedrate(interruptthread,0,2,
        timeunit.seconds);

    interruptthread interruptthread1 = new interruptthread(2);
    threadpoolutils.getinstance().getthreadpool().scheduleatfixedrate(interruptthread1,0,2,
        timeunit.seconds);

    interruptthread interruptthread2 = new interruptthread(3);
    threadpoolutils.getinstance().getthreadpool().scheduleatfixedrate(interruptthread2,0,2,
        timeunit.seconds);
    thread.sleep(5000);

		//终止正在运行的线程interruptthread
    t.cancel(true);
    while (true){

    }
  }

  @override
  public void run() {
    system.out.println("this is a thread" + num);
  }
}

踩坑记录

楼主在使用如下代码时,突然想到当这个定时任务需要被停止时该如何停止线程运行

threadpoolutils.getinstance().getthreadpool().scheduleatfixedrate(interruptthread,0,2,
        timeunit.seconds);

既然我有这样的需求,那就google一下吧,找了大半圈,愣是没找到相关资料,都是一些关于java线程池的深入分析。或者是全局变量啥的,并没有找到令楼主满意的解决方案。

既然没有线程的那就扒一下scheduleatfixedrate的底层源码看看是什么东西吧,果不其然我在源码中看到了scheduleatfixedrate方法的具体实现,发现他的返回值是scheduledfuture。

public scheduledfuture<?> scheduleatfixedrate(runnable command,
                         long initialdelay,
                         long period,
                         timeunit unit) {
    if (command == null || unit == null)
      throw new nullpointerexception();
    if (period <= 0)
      throw new illegalargumentexception();
    scheduledfuturetask<void> sft =
      new scheduledfuturetask<void>(command,
                     null,
                     triggertime(initialdelay, unit),
                     unit.tonanos(period));
    runnablescheduledfuture<void> t = decoratetask(command, sft);
    sft.outertask = t;
    delayedexecute(t);
    return t;
  }

接着往下我们再看看scheduledfuture里面有什么东西吧,没有让楼主失望,看到了这个

public boolean cancel(boolean mayinterruptifrunning) {
      boolean cancelled = super.cancel(mayinterruptifrunning);
      if (cancelled && removeoncancel && heapindex >= 0)
        remove(this);
      return cancelled;
}
      
//从线程的运行队列中移除当前线程
public boolean remove(runnable task) {
    boolean removed = workqueue.remove(task);
    tryterminate(); // in case shutdown and now empty
    return removed;
}

再往上查super.cancel(mayinterruptifrunning)是什么东西,我们看到了这个,

//通过调用线程的interrupt方法终止线程运行
public boolean cancel(boolean mayinterruptifrunning) {
    if (!(state == new &&
       unsafe.compareandswapint(this, stateoffset, new,
         mayinterruptifrunning ? interrupting : cancelled)))
      return false;
    try {  // in case call to interrupt throws exception
      if (mayinterruptifrunning) {
        try {
          thread t = runner;
          if (t != null)
            t.interrupt();
        } finally { // final state
          unsafe.putorderedint(this, stateoffset, interrupted);
        }
      }
    } finally {
      finishcompletion();
    }
    return true;
  }

到这里所有的问题都迎刃而解。

总结一下吧

项目中总是会遇到比较难搞的解决方案,当google不太好找时,翻一下jdk的源码或许也是一个不错的方法。