欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

JUC之线程池

程序员文章站 2022-07-13 14:42:47
...

创建线程池由工厂类Executors完成。其中newCachedThreadPool,newFixedThreadPool,newSingleThreadExecutor对应的实现类是ThreadPoolExecutor,newScheduledThreadPool对应的实现类是ScheduledThreadPoolExecutor。

ThreadPoolExecutor

JUC之线程池

  • int corePoolSize 线程池的最小线程数目,即使线程闲置,这些线程也一直保留
  • int maximumPoolSize 线程池中最大线程数目
  • long keepAliveTime 当线程池中的线程超过corePoolSize时,当超过该事件设定的时长线程一直处于闲置状态时,线程将被回收
  • TimeUnit unit 上述值对应的时间单位
  • BlockingQueue workQueue 一个保留Runable对象的阻塞队列。
  • ThreadFactory threadFactory 用于创建线程的工厂。该值可以缺省,此时使用的是默认的线程创建工厂。
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);//设置为非守护线程
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);//设置为正常优先级
            return t;
        }
    }
  • RejectedExecutionHandler handler 当线程池中的线程已经达到maximumPoolSize同时BlockingQueue也已经满员时,调用该handler。该值可以缺省,将采用如下默认实现
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }

        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }

ThreadPoolExecutor该类中,最核心的方法应该就是execute()了。

  • 如果当前线程数少于corePoolSize,那么就直接新建线程。无非是JDK基本的线程操作代码。
            w = new Worker(firstTask);
            final Thread t = w.thread;
            t.start();
  • 将命令加入队列。在加入队列后,我们仍然需要确认一下是否还有存活线程。如果没有了,还需要重新创建线程。

  • 如果我们不能成功的把命令加入队列,也不能新建线程,那么就只能reject这个命令

完整逻辑如下

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //isRunning(c)用于检查是否处于shutdown状态
        if (isRunning(c) && workQueue.offer(command)) {
            //再次确认
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);

ScheduledThreadPoolExecutor

JUC之线程池
构造函数如下:

    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);
    }

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor。核心是使用DelayedWorkQueue。DelayedWorkQueue是在该类内部定义的一个静态内部类,实现了BlockingQueue接口。
首先你需要知道算法,堆可以用来实现优先队列。DelayedWorkQueue就是使用了堆,插入元素使用siftUp方法,移除元素使用siftDown方法。在该对象中,维护了一个Condition。通过available.awaitNanos(delay)来消耗时间。

    private final Condition available = lock.newCondition();


        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    //如果为空则等待通知
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        //到时则出对
                        if (delay <= 0)
                            //finishPoll用于在出队时调整堆结构
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }