欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

自定义一个具有暂停功能的线程池

程序员文章站 2022-03-25 21:56:15
...

示例代码

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 自定义一个具有暂停功能的线程池
 */
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    //暂停标志
    private static boolean pause = false;
    //定义一个锁,保证多线程情况下的线程安全
    private static final ReentrantLock lock = new ReentrantLock();
    //使用Condition
    private static Condition condition = lock.newCondition();

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            while (pause) {
                condition.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 暂停方法
     */
    public void pause() {
        lock.lock();
        try {
            pause = true;
        } finally {
            lock.unlock();
        }
    }

    public void start() {
        lock.lock();
        try {
            pause = false;
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

试下这个具有暂停功能的线程池:

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class MyThreadPoolExecutorTest {

    static MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    
    final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(sdf.format(new Date()) + "运行任务");
        }
    }

    public static void main(String[] args) throws Exception {
        for (int i = 0; i < 6; i++) {
            myThreadPoolExecutor.execute(new Task());
        }
        TimeUnit.SECONDS.sleep(2);
        myThreadPoolExecutor.pause();
        //睡眠一秒钟,方便看出结果
        TimeUnit.SECONDS.sleep(1);
        System.out.println(sdf.format(new Date()) + "线程池被暂停");
        TimeUnit.SECONDS.sleep(6);
        myThreadPoolExecutor.start();
        System.out.println(sdf.format(new Date()) + "线程池开始运行");
        myThreadPoolExecutor.shutdown();
    }
}

运行结果:

自定义一个具有暂停功能的线程池

可以看到在10:47:25时,线程池被暂停,直到10:47:31线程池开始运行前,中间没有任何线程任务在执行,实现了一个具有暂停功能的线程池。

总结

这里只是重写了ThreadPoolExecutor类的一个beforeExecute()方法就实现了这个暂停的功能,其实其中还有很多方法都能重写,比如通过重写beforeExecute()和afterExecute()方法就能够实现在线程池提交任务前后记录日志的功能,方法可以被重写,就代表了无限的可能性。

相关标签: Java并发编程