JAVA多线程之JUC总结
1.前置知识
1.1.线程与进程
进程是一个具备独立功能的程序的一次动态执行的过程。是操作系统进行资源分配和调度的独立单位。
早期的操作系统是没有线程这个概念的。进程就是拥有资源的运行的最小单位,也是程序运行的最小单位。随着计算机的发展。进程的上下文切换开销较大
所以发明了一个新的概念“线程”。线程是程序执行中一个单一的顺序执行流程。是程序执行的最小的单位,是处理器调度的分配的基本单位。
总结:
1.线程是执行的最小单位,进程是操作系统分配资源的最小单位
2.一个进程由一个线程或者多个线程组成,线程是一个进程中代码的不同执行路线(多线程)
3.线程的调度和切换比进程快得多
进程可以类比为QQ.exe,而线程则是QQ的聊天功能,视频功能的执行过程
1.2并行与并发
并行是指当系统有多个cpu的时候,多个线程可以在不同的cpu上同时执行,两个进程互不抢占cpu资源。可以同时进行
并发是指在一个时间段中,有多个进程都处于已启动到运行完毕的过程中,它们抢占cpu资源。cpu将一个时间段划分成为一个个的时间片(时间区间),然后在这个时间片中进行进程的来回切换执行。
1.3线程的状态
Thread.State
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
NEW,(新建)
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE,(准备就绪)
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED,(阻塞)
/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
WAITING,(不见不散)
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,(过时不候)
/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED;(终结)
}
2.Lock接口
2.1多线程编程模板
进行多线程编程的时候,在高内聚低耦合的情况下,编写线程 操作 资源类。例子代码在下
class Ticket {/*资源类*/
private Lock lock = new ReentrantLock();/*可重入锁*/
private int number = 300;
public void sale() {//操作(对外暴露的调用方法)
try {
lock.lock();
if (number > 0) {
System.out.println(Thread.currentThread().getName() + "卖出第" + number-- + "张票,还剩下" + number + "张票");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class SaleTicket {
public static void main(String[] args) {
Ticket ticket = new Ticket();
//线程
new Thread(() ->{ for (int i = 0; i < 300; i++) ticket.sale(); },"A").start();
new Thread(() ->{ for (int i = 0; i < 300; i++) ticket.sale(); },"B").start();
new Thread(() ->{ for (int i = 0; i < 300; i++) ticket.sale(); },"C").start();
}
}
2.2传统Synchroinezd实现线程的通信
首先需要明白两个概念:
1.等待池:当线程A调用某个对象的wait方法后,那么线程A就会释放锁并进入该对象的等待池。在等待池中的线程是不会去竞争锁的。
2.锁池:当某个线程调用该对象的notifyAll()方法时,处于该对象等待池中的所有对象都会进入锁池,锁池中的线程会去竞争锁。
在线程通信的时候 编写代码口诀 判断 干活 通知
多线程交互中,必须防止多线程的虚假唤醒,也即(判断只能用while,不能用if,根本原因是if之后,不会再进行判断)
下方的例子中如果使用if判断,当当前number值是1时,唤醒线程可能会唤醒执行add方法的线程,而且add方法的线程不会再执行判断,所以会增加为2。导致错误
例子代码实现对一个变量进行加一减一。
class AirConditioner{
private int number = 0;
public synchronized void add(){
//判断(满足条件跳出循环)
while (number != 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//干活(执行业务逻辑)
number ++;
System.out.println(Thread.currentThread().getName() + number);
//通知(唤醒其他线程)
this.notifyAll();
}
public synchronized void dec(){
//判断
while (number == 0) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//干活
number --;
System.out.println(Thread.currentThread().getName() + number);
//通知
this.notifyAll();
}
}
public class ThreadWaitNotifyDemo {
public static void main(String[] args) {
AirConditioner conditioner = new AirConditioner();
new Thread(()->{
for (int i = 0; i < 10; i++) {
conditioner.add();
}
},"同学A").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
conditioner.dec();
}
},"同学B").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
conditioner.add();
}
},"同学C").start();
new Thread(()->{
for (int i = 0; i < 10; i++) {
conditioner.dec();
}
},"同学D").start();
}
}
2.3Lock实现线程的通信
首先看下图
使用Lock后,synchronized中的wait和notify被await和signal方法代替了。
而下面代码的Condition又是什么呢?还记得刚刚讲的等待池和锁池的概念吗。他们有异曲同工之妙,但是Condition的功能更加强大。
class AirConditioner2 {
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void add() {
lock.lock();
try {
//判断
while (number != 0) {
condition.await();
}
//干活
number++;
System.out.println(Thread.currentThread().getName() + number);
//通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void dec() {
lock.lock();
try {
//判断
while (number == 0){
condition.await();
}
//干活
number --;
System.out.println(Thread.currentThread().getName() + number);
//通知
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ThreadWaitNotifyDemo2 {
public static void main(String[] args) {
AirConditioner2 conditioner = new AirConditioner2();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
conditioner.add();
}
}, "同学A").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
conditioner.dec();
}
}, "同学B").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
conditioner.add();
}
}, "同学C").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
conditioner.dec();
}
}, "同学D").start();
}
}
2.4Lock实现精准唤醒线程
第一个问题:如何保证顺序
第二个问题:如何保证唤醒的线程
解决:第一个问题使用一个标志位,定义一个变量来充当标志位,判断时使用该变量来判断属于哪一个线程
第二个问题可以使用Condition来实现精准唤醒。condition都有自己的等待队列。当await的时候,就将该线程放入自己的等待队列中。当使用signal()的时候唤醒当前condition队列中的线程
/* 多个线程之间按顺序调用,实现A->B->C
* AA打印五次,BB打印十次,CC打印十五次
* 接着
* AA打印五次,BB打印十次,CC打印十五次
来十轮*/
class ShareResource{
private int number = 1;//A:1 B:2 C:3 标志位
private Lock lock = new ReentrantLock();
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
/*设置三个condition是为了更加精确提升效率,每个condition都有自己的等待队列。当await的时候,就将该线程放入自己的等待队列中。
可以用当前condition.singal方法进行精确唤醒*/
public void print5(){
lock.lock();
try {
//判断
while (number != 1){
condition1.await();
}
//干活
for (int i = 0; i < 5; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
//通知
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print10(){
lock.lock();
try {
//判断
while (number != 2){
condition2.await();
}
//干活
for (int i = 0; i < 10; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
//通知
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print15(){
lock.lock();
try {
//判断
while (number != 3){
condition3.await();
}
//干活
for (int i = 0; i < 15; i++) {
System.out.println(Thread.currentThread().getName() + "\t" + i);
}
//通知
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class ZZ01ThreadOrderAccess {
public static void main(String[] args) {
ShareResource shareResource = new ShareResource();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
shareResource.print5();
}
},"A").start();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
shareResource.print15();
}
},"C").start();
new Thread(()->{
for (int i = 0; i <10 ; i++) {
shareResource.print10();
}
},"B").start();
}
}
总结
1.首先synchronized是java内置关键字,在jvm层面,Lock是个java类;
2.synchronized无法判断是否获取锁的状态,Lock可以判断是否获取到锁;
3.synchronized会自动释放锁(a 线程执行完同步代码会释放锁 ;b 线程执行过程中发生异常会释放锁),Lock需在finally中手工释放锁(unlock()方法释放锁),否则容易造成线程死锁;
4.用synchronized关键字的两个线程1和线程2,如果当前线程1获得锁,线程2线程等待。如果线程1阻塞,线程2则会一直等待下去,而Lock锁就不一定会等待下去,如果尝试获取不到锁,线程可以不用一直等待就结束了;
5.synchronized的锁可重入、不可中断、非公平,而Lock锁可重入、可判断、可公平(两者皆可)
6.Lock锁适合大量同步的代码的同步问题,synchronized锁适合代码少量的同步问题。
3.线程安全的集合类
3.1CopyOnWriteArrayList
先看看我们平时使用的ArrayList是如何在多线程环境报异常的
public static void listNotSafe1() {
//List<String> list = Collections.synchronizedList(new ArrayList<>());
List<String> list = new ArrayList<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
上述代码执行过程中会出现java.util.ConcurrentModificationException(并发修改异常)
导致此异常的原因是什么呢?
看看报错信息是迭代器出现了异常
private class Itr implements Iterator<E> {
int cursor; //下一个要访问元素的索引值
int lastRet = -1; //上一个访问元素的索引值
int expectedModCount = modCount;//修改次数的期望值,modCount表示ArrayList被修改的次数
Itr() {}
public boolean hasNext() {
return cursor != size;
}
@SuppressWarnings("unchecked")
public E next() {
checkForComodification();
int i = cursor;
if (i >= size)
throw new NoSuchElementException();
Object[] elementData = ArrayList.this.elementData;
if (i >= elementData.length)
throw new ConcurrentModificationException();
cursor = i + 1;
return (E) elementData[lastRet = i];
}
public void remove() {
if (lastRet < 0)
throw new IllegalStateException();
checkForComodification();
try {
ArrayList.this.remove(lastRet);
cursor = lastRet;
lastRet = -1;
expectedModCount = modCount;
} catch (IndexOutOfBoundsException ex) {
throw new ConcurrentModificationException();
}
}
@Override
@SuppressWarnings("unchecked")
public void forEachRemaining(Consumer<? super E> consumer) {
Objects.requireNonNull(consumer);
final int size = ArrayList.this.size;
int i = cursor;
if (i >= size) {
return;
}
final Object[] elementData = ArrayList.this.elementData;
if (i >= elementData.length) {
throw new ConcurrentModificationException();
}
while (i != size && modCount == expectedModCount) {
consumer.accept((E) elementData[i++]);
}
// update once at end of iteration to reduce heap write traffic
cursor = i;
lastRet = i - 1;
checkForComodification();
}
final void checkForComodification() {
//当对list的次改次数不等于期望的修改次数后,就会产生这个异常,比如多线程扩容的情况下
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
}
我们来看看线程安全的集合类
使用CopyOnWriteArrayList可以实现
public static void listNotSafe() {
//List<String> list = Collections.synchronizedList(new ArrayList<>());
List<String> list = new CopyOnWriteArrayList<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(list);
}, String.valueOf(i)).start();
}
}
看看CopyOnWriteArrayList是如何实现并发修改的(保证高并发的读,和写的时候数据一致性)。看看添加元素的源码
CopyOnWriteArrayList是一个写时复制的思想。当向容器中添加元素的时候,并不会向原容器中直接添加,而是复制出一个新的数组,将原有容器的元素添加到新的数组
Object[] newElements 中,并且长度为原有容器长度+1,然后向新的容器添加元素,最后setArray(newElements),将原容器的引用指向新的容器。
这样做的好处是,可以进行并发的读,而不需要加锁。只有对元素进行增删时才会进行加锁操作。所以CopyOnWriteArrayList也会一种读写分离的思想,读和写都是不同的容器
。
3.2CopyOnWriteArraySet
说句题外话。HashSet底层是HashMap。那么value值是什么呢?
value值是一个Object对象。为什么设置为Object,而不是null呢。null占用空间小,而且效率更高,究竟是为什么呢?
我们先来看看HashSet的remove方法
public boolean remove(Object o) {
return map.remove(o)==PRESENT;
}
可以看到它调用的是HashMap的remove方法,来吧。看看HashMap的remove方法
public V remove(Object key) {
Node<K,V> e;
return (e = removeNode(hash(key), key, null, false, true)) == null ?
null : e.value;
}
看到这应该就明白了,当删除某一个数据时。会返回被删除数据的value。如果HashSet的value设置为null的话。删除数据永远返回null。从而无法判断是否删除成功。
CopyOnWriteArraySet又是如何把保证线程安全的呢?
看看它的构造方法
public class CopyOnWriteArraySet<E> extends AbstractSet<E>
implements java.io.Serializable {
private static final long serialVersionUID = 5457747651344034263L;
private final CopyOnWriteArrayList<E> al;
/**
* Creates an empty set.
*/
public CopyOnWriteArraySet() {
al = new CopyOnWriteArrayList<E>();
}
}
可以看到,它的底层也是使用的CopyOnWriteArrayList。接下来就不用多说了吧
3.3ConcurrentHashMap
请关注后续帖子
4.Callable接口
既然有了Runnable接口为什么还需要Callable接口呢?
来看看吧
class MyThread implements Runnable{
@Override
public void run() {
}
}
class MyThread2 implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("***********come in");
return 1024;
}
}
从上述代码段可以看出run方法没有返回值,且不能抛出异常,而call方法有返回值,而且能抛出异常。是否感觉很强大呢?
那么如何传入Callable呢?看他的继承树,它和Runnable接口有没有任何关系
我们可以先看看Runnable接口
它有一个实现类。是FutureTask。而FutureTask刚好可以传入一个Callable。
好了。这下我们知道怎么写代码了
public class ZZ04CallableDemo {
public static void main(String[] args)
throws ExecutionException, InterruptedException {
MyThread2 thread2 = new MyThread2();
FutureTask futureTask = new FutureTask(thread2);
new Thread(futureTask,"a").start();
System.out.println(futureTask.get());
}
}
这下就可以传入了。
那么Callable究竟是为了解决什么样的问题而存在呢?
重点就是返回值,可以获取在不同的计算场景中获取他们的返回值,成功或失败等
以上述代码为例
当在主线程中执行比较耗时的任务时,又不想阻塞主线程。可以将这些任务交给Callable在后台完成。当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。
一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。
重点:仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成, 就不能再重新开始或取消计算。get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态, 然后会返回结果或者抛出异常。
也就是说调用get方法时,如果get方法未执行完毕,那么主线程也会被阻塞。所以说get方法最好是放在最后执行。
5.常用工具类
5.1CountDownLatch
计数器
例子代码:教室里有6个学生,必须等6个学生全部离开教室,教室门才可以关闭
public class ZZ05CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i <6 ; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"离开教室");
countDownLatch.countDown();
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"班长关门走人");
}
}
CountDownLatch主要有两个方法:
countDown():其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞)
await():当一个或多个线程调用await方法时,这些线程会阻塞。当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。
5.2CyclicBarrier
可循环使用的屏障
例子代码:当阻塞线程达到7个,才可以召唤神龙
public class ZZ06CycliBarrierDemo {
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
{
System.out.println("召唤神龙");
};
});
for (int i = 0; i < 7; i++) {
final int tempInt = 1;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"收集到第" + tempInt + "颗龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
CyclicBarrier内部有一个方法
await():当前线程进入阻塞状态(进入屏障)
只有当最后一个线程到达屏障点。匿名内部类的方法和所有被屏障拦截的线程才会继续执行。
5.3Semaphore
信号量
例子代码:6辆车抢占3个车位
public class ZZ07SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);//模拟资源类,有三个空车位
for (int i = 0; i < 6; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢占到了车位");
//暂停一会儿线程
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName() + "离开了车位");
} catch (Exception e) {
e.printStackTrace();
}finally {
semaphore.release();
}
}, String.valueOf(i)).start();
}
}
}
Semphore内部有两个方法
acquire():获取,当线程调用此方法时,要么成功获取信号量(信号量-1),要么一直阻塞,直到有线程释放信号量或者超时
release():释放,当线程调用此方法时,会释放信号量(信号量+1)。然后唤醒阻塞的线程。
Semphore的使用主要有两个目的,一个是设置多线程的互斥使用,一个是控制并发量
6.ReadWriteLock
在传统的高并发情况下,不管读和写都会加锁。其他线程只能等到获取到锁后才能去执行自己的操作。这样的话如果只是读取操作,也会导致效率变慢
所以诞生了读写锁。只有读操作时为了满足高并发只是加共享锁,此时读操作允许多个线程同时访问。当某一个线程拿到写锁时,此时会加排它锁。除此线程外所有线程都不能读写
总而言之就是 读-读 可以共存 读-写 不可以共存 写-写 不可以共存
class Cache {
private volatile Map<String, Object> map = new HashMap<>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
readWriteLock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + "写入数据"+ value );
try {
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "写入完成");
readWriteLock.writeLock().unlock();
}
public void get(String key) {
readWriteLock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "读取了数据");
try {
TimeUnit.MICROSECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object o = map.get(key);
System.out.println(Thread.currentThread().getName() + "读取完成" + o);
readWriteLock.readLock().unlock();
}
}
public class ZZ08ReadOnWriteLockDemo {
public static void main(String[] args) {
Cache cache = new Cache();
for (int i = 0; i < 5; i++) {
final int tempInt = i;
new Thread(() -> {
cache.put(tempInt+"",tempInt + "");
}, String.valueOf(i)).start();
}
for (int i = 0; i < 5; i++) {
final int tempInt = i;
new Thread(() -> {
cache.get(tempInt+"");
}, String.valueOf(i)).start();
}
}
}
底层源码请关注后续AQS解读
7.BlockingQueue(阻塞队列)
阻塞队列是用于在高并发情况下不得不阻塞线程的时候,如何管理线程和资源的一种手段
Thread1向阻塞队列中添加元素,Thread2从队列中取走元素
当队列是空的,从对列中取出元素的操作将会被阻塞
当队列是满的,向对列中添加元素的操作将会被阻塞
试图从空的队列中获取元素的线程将会被阻塞,直到其他线程往空的队列插入新的元素
试图向已满的队列中添加新元素的线程将会被阻塞,直到其他线程从队列中移除一个或多个元素或者完全清空,使队列变得空闲起来并后续新增
阻塞队列的优势:
不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为一切都由BlockingQueue包办了。
首先看看阻塞队列的继承树
ArrayBlockQuque:由数组结构组成的有界的阻塞队列
LinkedBlockingQueue:由链表结构组成的有界(界限为Integer.MAX_VALUE)的阻塞队列
SynchronousQueue:不存储元素的阻塞队列。也就是单个元素的队列
LinkedBlockingDeque:由链表结构组成的有界(界限为Integer.MAX_VALUE)的双向阻塞队列
具体的方法见下
public class ZZ09BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> blockingQueue= new ArrayBlockingQueue<>(3);
/*此组add方法与remove,超出队列界限会抛出异常
System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.add("c"));
blockingQueue.remove();
blockingQueue.remove();
blockingQueue.remove();
blockingQueue.remove();
System.out.println(blockingQueue.add("d"));*/
/*System.out.println(blockingQueue.add("a"));
System.out.println(blockingQueue.add("b"));
System.out.println(blockingQueue.element());//返回队列第一个元素*/
/*此组offer与poll方法超出队列界限不会抛出异常只返回false和null
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b"));
System.out.println(blockingQueue.offer("c"));
System.out.println(blockingQueue.offer("x"));
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.poll());*/
/*put,当队列没有满,可以往里添加,满了后续的put操作会被阻塞*/
/*blockingQueue.put("a");
blockingQueue.put("b");
blockingQueue.put("c");
blockingQueue.put("d");
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());*/
/* System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("a"));
System.out.println(blockingQueue.offer("b", 3L, TimeUnit.SECONDS));*/
}
}
8.线程池
什么是线程池?
线程池是一种多线程处理任务的一种方式。处理过程中将任务添加到队列(也就是我们上一讲的阻塞队列中),然后这些线程自动启动这些任务
线程池的主要工作就是控制运行的线程数量,处理过程中将任务放入阻塞队列,然后在线程创建后去执行这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等待其他线程执行完毕,再从队列中取出任务来执行
主要特点:
线程复用
控制最大并发数
管理线程。
优势:
1.重复利用已经创建的线程,降低线程创建和销毁所带来的开销
2.任务可以不需要等待线程创建就可以执行,从而提高响应速度
3.提高线程的可管理性,使用线程池可以进行集中的监控,分配和调优。
看看Java中线程池的继承树
Executors类似于Collections,它作为线程池的工具类为我们提供了很多强大的功能
例子代码如下:
public class ZZ11MyThreadPoolDemo {
public static void main(String[] args) {
/*
Executors 辅助工具类,类似于collections
* */
/* ExecutorService threadPool = Executors.newFixedThreadPool(5);//一池五个线程,类似于银行有五个受理窗口
ExecutorService threadPool = Executors.newSingleThreadExecutor();*///一个池子一个线程,类似于银行有一个受理窗口
//一池子N个工作线程,类似于银行有N个受理窗口。相当于前两个线程池的结合体,
//执行很多任务时可以使用,线程池根据任务需要进行创建线程,但是在先前的线程可用的时候重用他们,在不可用的时候新建线程(扩容)
//ExecutorService threadPool = Executors.newCachedThreadPool();
//自定义线程池
ExecutorService threadPool = new ThreadPoolExecutor
(2, 5, 2L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy());
try {
/*模拟十个顾客过来办理业务,目前池子只有五个工作人员提供服务*/
for (int i = 0; i < 9; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
我们看看它们是如何创建线程池的
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
可以看到在源码中,ThreadPoolExecutor中有七个参数
int corePoolSize 常驻核心线程数
int maximumPoolSize 线程池中能容纳同时执行的最大线程数
long keepAliveTime 多余空闲线程的存活时间,如果达到,线程会被销毁
TimeUnit unit keepAliveTime的单位
BlockingQueue<Runnable> workQueue 任务队列,被提交但是尚未被执行的任务
ThreadFactory threadFactory 创建线程的工厂
RejectedExecutionHandler handler 拒绝策略,当队列满了,并且工作线程大于等于线程池的最大线程数时如何来拒绝请求执行的runnable的策略
流程:
1.线程池创建以后,开始等待任务
2.当调用execute()方法后,添加一个请求任务后,并做出如下判断
2.1 如果当前运行线程小于corePoolSize,那么马上创建线程来执行此任务
2.2如果当前运行线程大于或者等于corePoolSize,那么将任务添加到workQueue
2.3如果当前workQueue已经满了,且当前运行线程数小于maximumPoolSize,那么还是会继续创建线程来执行任务
2.4如果当前workQueue已经满了,且当前运行线程数大于或者等于maximumPoolSize,那么会启动饱和拒绝策略来执行
3.当一个线程完成任务后,它会从队列中取出下一个任务来执行
4.当一个线程空闲时间超过keepAliveTime,线程会判断
如果当前运行的线程数大于corePoolSize,那么会立即销毁掉此线程。最终线程数会回缩到corePoolSize
线程池的拒绝策略
可以看到RejectedExecutionHandler有四个实现类
AbortPolicy:默认策略,直接抛出RejectedExecutionException异常来阻止系统的继续运行
CallerRunsPolicy:调用者运行策略,将任务回退给发送此任务的线程(在测试代码中就是main线程),让此线程来执行此任务
DiscardOldestPolicy:抛弃阻塞队列中等待最久的任务,然后将该任务放入阻塞队列
DiscardPolicy:默默丢弃无法处理的任务,不予以任何处理也不报任何异常(允许任务丢失)
源码:
所谓流程是源码的文字表达。那么我们来看看源码是如何实现的。首先看看ThreadPoolExecutor的成员变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//Integer.SIZE表示int类型占用32位
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
private final BlockingQueue<Runnable> workQueue
//可重入锁对象
private final ReentrantLock mainLock = new ReentrantLock();
//这个就是线程池,用于存放线程的
private final HashSet<Worker> workers = new HashSet<Worker>();
//condition
private final Condition termination = mainLock.newCondition();
//最大的线程数
private int largestPoolSize;
//已经执行完成的任务
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
//是否允允许核心线程超时销毁,默认是false,如果为true。当核心线程空闲时间达到keepAliveTime,核心线程也会被销毁
private volatile boolean allowCoreThreadTimeOut;
private volatile int corePoolSize
private volatile int maximumPoolSize
上面我们看到了HashSet<Worker> workers = new HashSet<Worker>();他就是线程池。我们看看Worker的继承树
可以看到Worker是一个Runnable接口的实现类,而且是一个AQS。AQS后续再说
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//线程对象
final Thread thread;
//又是一个Runabble对象,代表第一次要执行的任务
Runnable firstTask;
//完成的任务
volatile long completedTasks;
//构造器
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
//重点:此处创建了一个新的线程,并且将this。当前Worker对象传入,言外之意就是要执行Worker的run方法
this.thread = getThreadFactory().newThread(this);
}
//我们来看看runWorker究竟执行了什么
public void run() {
runWorker(this);
}
}
进入ThreadPoolEXecutor的runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
本文地址:https://blog.csdn.net/jjk45655/article/details/112517150
上一篇: 京东PLUS会员月度优惠券怎么领取?
下一篇: ASP.NET程序发布详细过程