欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java线程池--初步了解线程池

程序员文章站 2022-05-04 17:53:48
...

当时觉得自己菜,写的没深度,放弃了。这次希望自己坚持下去,加油

博客主页:知识积累

个人网站:个人网站

今天我们简单来学习一下java线程池的基本知识,了解一个简单的运行流程。

线程池的流程图

java线程池--初步了解线程池

大概就是图中这几步。

jdk版本为1.8 

注意看类的包路径,我一开始就看错了类,看到了tomcat里面的线程池去了,而且类名什么的还都一样,实现是不一样的。

java的线程池的类都在 package java.util.concurrent 包***意别看错了!

1 创建线程池

1.1 了解参数
下面是一个创建线程池的代码
ExecutorService exec = new ThreadPoolExecutor(20, 100,1, TimeUnit.MINUTES,new ArrayBlockingQueue(100));

简单要了解一下线程池到底有哪些参数
下面是线程池所有入参的构造函数,我们来一一说明每个参数的含义

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {....}

corePoolSize:核心线程数量。核心线程即使空闲也不会被回收。
maximumPoolSize:最大线程数。即线程池中最多,只能存放这么多数量的线程,所以核心线程数量+其他线程数量 < 最大线程数量
keepAliveTime:线程空闲时存活的时间。一旦非核心线程出现空闲,再指定时间内还是没有被使用,就会被回收。
unit:上述线程存活时间的单位。
workQueue:阻塞队列。提交任务时,所有的核心线程被其他任务占用,没有空闲的核心形成,那么会将这个任务存放到阻塞队列当中。直到有空闲的核心线程来执行阻塞队列中的任务。
threadFactory:创建线程池中线程对象的工厂。
handler:拒绝策略。 当提交任务时,此时核心线程都在被其他任务占用,阻塞队列的也被提交是任务占满,而此时创建的其他线程也达到了总线程数的上线,那么就会将该任务交给拒绝策略来处理。

简单了解几个常用的阻塞队列
ArrayBlockingQueue
:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue,静态工厂方法 Executors.newFixedThreadPool()使用了这个队列。
SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个列。
PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

简单了解几个常用的拒绝策略
AbortPolicy
:不处理,直接抛出异常。
CallerRunsPolicy:若线程池还没关闭,调用当前所在线程来运行任务,r.run()执行。
DiscardOldestPolicy:LRU策略,丢弃队列里最近最久不使用的一个任务,并执行当前任务。
DiscardPolicy:不处理,丢弃掉,不抛出异常。

线程池也是有状态的,线程池很多操作都是根据对线程池状态和数量执行CAS操作,来保证线性安全。
线程池状态是一个AtomicInteger类型的整数,用高3位表示线程池的运行状态,低29位表示线程池的线程数量。
简单介绍一下状态
RUNNING:高3位为111,运行中,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
SHUTDOWN:高3位为000,关闭,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
STOP :高3位为001,停止,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
TIDYING :高3位为010,整理, 所有的任务都已经终止;
TERMINATED:高3位为011,终止, terminated()方法已经执行完成

1.2 具体创建流程

1.2.1 简单介绍几个类中的重要参数
ctl:控制器

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

这个整形变量的高3位表示线程池的状态,低29位表示线程池中线程的数量。通过cas对该值的修改,来实现并发时的线性安全。

ftask :可以获得返回值的任务
RunnableFuture<Void> ftask = newTaskFor(task, null);

我们往线程池提交任务的时,会先将任务转成RunnableFuture,然后才调用添加任务的接口

workers :线程池中工作线程的容器
private final HashSet<Worker> workers = new HashSet<Worker>();

Worker:线程池中创建的线程最终都会封装成worker,我们通过worker去执行我们提交的任务。
该类继承了同步器AbstractQueuedSynchronizer和继承了线程类Runnable

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    //序列化id
    private static final long serialVersionUID = 6138294804551838833L;

    //持有的线程
    final Thread thread;
    //线程的任务
    Runnable firstTask;
    //线程的计数器,统计该worker总共完成了多少个任务
    volatile long completedTasks;

    //构造
    Worker(Runnable firstTask) {
        //同步状态设置为-1
        setState(-1); 
        this.firstTask = firstTask;
        //用创建线程池传入的线程工厂创建线程
        this.thread = getThreadFactory().newThread(this);
    }

    //运行worker
    public void run() {
        //运行worker
        runWorker(this);
    }
    
    /*------------------------------下面都是重写同步器的方法-----------------------------------------*/
    //判断同步状态是否不是0
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }
    //试图获取锁
    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            //上面cas执行成功,说明该线程已经获得了这个同步状态,那么将同步器的exclusiveOwnerThread设置为当前线程。
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
    //试图释放锁
    protected boolean tryRelease(int unused) {
        //释放锁,说明没有线程获得同步状态,那么就将同步器的exclusiveOwnerThread设置为null
        setExclusiveOwnerThread(null);
        //同步状态设置为0
        setState(0);
        return true;
    }

    public void lock()        { acquire(1); }
    public boolean tryLock()  { return tryAcquire(1); }
    public void unlock()      { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }
    //如果线程池启动时被中断,执行下面代码
    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}


1.2.2 查看具体代码流程
创建线程池

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue) {
    //调用最终构造函数
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), defaultHandler);
}

由上可知,如果不传拒绝策略,默认使用了AbortPolicy,即什么都不做

private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();


接下来简单看看 最终的构造函数干了什么

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    //参数校验
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    //启用Java安全管理器,获取系统安全接口,防止恶意代码的运行。
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    //转个时间,单位纳秒!!!!!
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

由上可知,就是给各种全局变量赋值。

2 往线程池中添加任务
线程池有两个方法可以提交任务,execute()和submit():
区别在于,submit是可以接受返回值,而execute不可以。

//创建线程池
ExecutorService executorService= new ThreadPoolExecutor(20, 100,
        1, TimeUnit.MINUTES,
        new ArrayBlockingQueue(100));
//往线程池中添加任务
executorService.execute(new Runnable() {
    @Override
    public void run() {
        System.out.println("1线程名:" + Thread.currentThread().getName());
    }
});
executorService.submit(new Runnable() {
    @Override
    public void run() {
        System.out.println("2线程名:" + Thread.currentThread().getName());
    }
});


2.1 看submit和execute的区别
submit的底层调用的还是execute,在调用execute之前,将传入的thread转成了RunnableFuture

AbstractExecutorService.submit():

public Future<?> submit(Runnable task) {
    //添加的任务不能为空
    if (task == null) throw new NullPointerException();
    //创建一个RunnableFuture,RunnableFuture这个类继承了Runnable和Future<V>,可以通过这个对象获取任何运行成功的返回值
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    //往线程池中添加封装后的ftask
    execute(ftask);
    return ftask;
}

在提交任务的时候,将我们提交的任务封装成一个RunnableFuture
我们先来看看RunnableFuture到底是个什么东西
RunnableFuture

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

RunnableFuture基础了Runnable和Future,那么他本身可以作为线程,Future允许方法获得该异步线程执行后的结果。
为什么要使用RunnableFuture,因为线程池也可以通过该类中,使用的是RunnableFuture的子类FutureTask

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
    return new FutureTask<T>(runnable, value);
}

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    //我们提交任务
    this.callable = callable;
    this.state = NEW;    
}


2.2 接下来看看executre方法。
ThreadPoolExecutor

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    //ctl就是那个高三位表示线程池状态,低29位表示线程池数量的那个ctl控制值
    int c = ctl.get();
    //1 如果当前线程数量小于核心线程数量,那么去创建核心线程来执行该任务
    if (workerCountOf(c) < corePoolSize) {
        //讲该任务添加到线程池中,并创建一个核心线程去执行他。
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    //2 核心线程已经满了,那么添加到阻塞队列中
    //判断状态是否是运行中,且添加元素到阻塞队列是否成功
    if (isRunning(c) && workQueue.offer(command)) {
        //再次获取ctl的值
        int recheck = ctl.get();
        //如果此时状态不是RUNNING,
        if (! isRunning(recheck) && remove(command))
            //回退操作
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3 如果阻塞队列也满了,那么就创建非核心线程来执行该任务
    else if (!addWorker(command, false))
        //回退操作
        reject(command);
}

由上可知,线程池添加任务的逻辑大概就是三步

 

 

1 如果当前线程数量小于核心线程数量,那么去创建核心线程来执行该任务
2 核心线程已经满了,那么添加到阻塞队列中
3 如果阻塞队列也满了,那么就创建非核心线程来执行该任务
下面一个个来分析2.2.1  如果当前线程数量小于核心线程数量,那么去创建核心线程来执行该任务,addWorker()
ThreadPoolExecutor

private boolean addWorker(Runnable firstTask, boolean core) {
    //1 通过死循环,用cas给线程池的线程数量+1,从而实现乐观锁,从而实现并发安全。
    //标志位,用于循环中的灵活跳转
    retry:
    for (;;) {
        //获取*控制值
        int c = ctl.get();
        //获取此时线程池的状态
        int rs = runStateOf(c);
        //如果线程池的状态不是运行中,
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            //获取此时线程池拥有线程的数量
            int wc = workerCountOf(c);
            //如果线程数量大于最大容量,直接返回失败
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            //通过cas给线程数量加1,成功就退出标志位retry所关联的循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            //cas失败,那么就获取此时线程池的ctl的值
            c = ctl.get();  // Re-read ctl
            //获取此时ctl的线程池状态,并且和方法初获取的线程池状态做比较。如果一样,继续在子循环中执行cas,如果不相等,回到retry关联的最外层循环,重新开始执行。
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    //2 根据线程工厂,往线程池中添加一个核心线程对象
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //新建一个worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            //获得锁,是个全局变量且final修饰的ReentrantLock。
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //获得此时线程池的状态
                int rs = runStateOf(ctl.get());
                //如果线程池的状态是小于SHUTDOWN即RUNNING,或者线程池的状态是关闭且任务是null
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    //线程是否是存活状态,是的话说明该worker已经被其他任务占用,抛出异常
                    if (t.isAlive()) 
                        throw new IllegalThreadStateException();
                    //将新建的worker添加到workers中
                    workers.add(w);
                    //获取worder的数量
                    int s = workers.size();
                    //将largestPoolSize 设置为当前worker的数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //设置标识,worker添加成功    
                    workerAdded = true;
                }
            } finally {
                //解锁
                mainLock.unlock();
            }
            if (workerAdded) {
                //运行worker的线程
                //这里要注意,t并不是简单的thread,而是上面所说被包装的RunnableFuture的子类FutureTask
                t.start();
                //设置标识,worker开启成功
                workerStarted = true;
            }
        }
    } finally {
        //如果开始失败,执行一些补偿操作
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}


2.2.2 核心线程已经满了,那么添加到阻塞队列中

//主要是offer(command),将任务添加到结点中
if (isRunning(c) && workQueue.offer(command)) {
        //再次获取ctl的值
        int recheck = ctl.get();
        //如果此时状态不是RUNNING,则移除该command
        if (! isRunning(recheck) && remove(command))
            //回退操作
            reject(command);
         //此时线程数是0
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }

这里主要观察阻塞队列的逻辑,具体如何消费阻塞队列的任务,不同的阻塞队列有不同的逻辑,我们用ArrayBlockQueue来为例子。
ArrayBlockQueue底层是基于数组实现的。
2.2.2.1 ArrayBlockQueue的初始化

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    //创建指定长度的数组
    this.items = new Object[capacity];
    //创建一个非公平锁
    lock = new ReentrantLock(fair);
    //创建condition,用于获取
    notEmpty = lock.newCondition();
    //创建condition,用于释放
    notFull =  lock.newCondition();
}

2.2.2.2 给队列添加数据

public boolean offer(E e) {
    //校验,非空
    checkNotNull(e);
    //获取lock
    final ReentrantLock lock = this.lock;
    //操作前加锁
    lock.lock();
    try {
        //如果队列以及满了,返回false
        if (count == items.length)
            return false;
        else {
            //给队列天机数据
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}



private void enqueue(E x) {
    final Object[] items = this.items;
    //指定数组指定位置,复制x
    items[putIndex] = x;
    //如果此时长度已经满了,将putIndex 设置为0
    if (++putIndex == items.length)
        putIndex = 0;
    /数量加1
    count++;
    //notEmpty唤醒
    notEmpty.signal();
}

疑问:

1 ArrayBlockQueue阻塞队列是如何消费任务的,全程是看到调用了offer方法,看到唤醒了notEmpty,但是全程没看到哪里调用了让notEmpty唤醒的方法。希望有谁知道告知一下。

 

2.2.3 如果阻塞队列也满了,那么就创建非核心线程来执行该任务,如果创建失败,执行拒绝策略

//3 如果阻塞队列也满了,那么就创建非核心线程来执行该任务
    else if (!addWorker(command, false))
        //如果添加非核心线程失败,则执行下面的拒绝策略
        reject(command);


final void reject(Runnable command) {
    //执行拒绝策略
    handler.rejectedExecution(command, this);
}

参考书籍《java并发编程的艺术》

 

 

 

 

 

 

 

 

 

 

坚持一件事情,本来就不简单