java实现线程通信的几种方式
前言
在多线程的世界里,线程与线程之间的交互无处不在,只不过在平时的开发过程中,大多数情况下,我们都在单线程的模式下进行编码,即使有,也直接借助框架自身的机制实现了,其实线程之间的通信在JDK中是一个比较深的问题,比如大家熟知的消息中间件的实现,从某种角度上讲,就借助了多线程通信的思想,下面总结了JDK中常用的几种实现线程通信的方式,提供参考
1、synchronized实现方式
可能很多小伙伴们会有疑问,synchronized是对共享资源加锁使用的,怎么和线程通信扯在一起呢?这里纠正一个小小的偏见,也是我近期才矫正过来的
我们要弄明白的一点是,为什么会存在线程通讯这个问题呢?根据一些技术大牛们的说法就是,多个线程之间需要相互传递一些参数、变量或者是各个线程的执行需要互相依赖各自的结果,比如我们熟知的生产者消费者模式,只有生产者生产出来了东西,消费者才能进行消费啊
这里模拟假如有2个线程,需要操作一个共享资源,即修改共享资源的数据,使用synchronized的方式如下:
public class SycDemo1 {
private static Object lock = new Object();
private static String weather = "sunny";
public static void main(String[] args) {
new Thread(()->{
synchronized (lock){
System.out.println("t1 before change weather is :" + weather);
weather = "raing";
System.out.println("t1 after change weather is :" + weather);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t1").start();
new Thread(()->{
synchronized (lock){
System.out.println("t2 before change weather is :" + weather);
weather = "snow";
System.out.println("t2 after change weather is :" + weather);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t2").start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
Thread.sleep(2000);
System.out.println("main thread weather is:" +weather);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
代码比较简单,2个线程去修改一个weather 的共享变量,由于线程启动不依赖代码的顺序,任何一个获得了lock锁的线程就会先执行对weather的修改,从控制台的打印输出可以看到,可以满足我们的需求,这里也可以说明,synchronized属于独占锁或者排它锁,即某个时刻只有一个线程可以持有这把锁
既然说到这里,我们就对synchronized的锁原理做一点深入的说明,仅提供参考。
在多线程访问共享资源的时候,经常会带来可见性和原子性的安全问题。为了解决这类线程安全的问题,java提供了同步机制、互斥机制,这个机制保证了在同一时刻只有一个线程能访问共享资源。这个机制来源于监视锁Monitor,每个对象都拥有自己的监视锁Monitor,比如上面的案例中的lock,就是Object对象的一个锁,那么说在被synchronized修饰了之后,这个lock上面就存在了一个Monitor,这个Monitor的作用是什么呢?
我们设想,监视器就是在某一时刻有一个小房子,在这个小房子里面放置了一些资源,这里就可以理解为weather共享的变量,这时候,有很多人(多个线程)需要访问这个资源,但是Monitor上面有一把锁,多个线程只有一个人能够抢占成功并访问资源,二其余的则会进行等待,这里为阻塞,从Monitor的数据结构上说,就是将其他线程加入到一个waitSet中,直到持有锁的线程释放了lock。
但这里的问题是,这个waitSet并非像是queue那样具有顺序性,一旦某个线程的锁释放后,其他线程争抢的时候依然是随机的,即不能保证公平性,因此synchronized锁是非公平锁。
下面借助一张简图方便理解synchronized的锁原理
2、join() 实现方式
thread.Join把指定的线程加入到当前线程,可以将两个交替执行的线程合并为顺序执行的线程。
比如在线程B中调用了线程A的Join()方法,直到线程A执行完毕后,才会继续执行线程B。
我们来看下面的例子:
public class JoinDemo {
private static int counter = 1;
private static int result = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
counter = 2;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1");
t1.start();
System.out.println("等待线程1执行完毕结果....");
t1.join();
Thread t2 = new Thread(() -> {
counter = 2;
result = 100 + counter;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t2");
t2.start();
t2.join();
//主线程打印结果
System.out.println("主线程获取到了最终的执行结果:" + result);
}
}
通过控制台的打印输出结果顺序,我们可以看到,join的作用就像是起到了一个线程暂停的作用,某个线程要得到另一个线程的结果之前被阻塞了一样
其实,join方法是通过调用线程的wait方法来达到同步的目的的。例如,A线程中调用了B线程的join方法,则相当于A线程调用了B线程的wait方法,在调用了B线程的wait方法后,A线程就会进入阻塞状态
3、wait notify 实现方式
wait() notify()应该是大家比较熟悉的进行线程通讯的方式了,常见于生产者、消费者模式中使用,他们属于Object的方法,一般需要配合起来使用,即有wait()就有notify(),或者notifyall(),同时在使用wait notify的时候通常是要使用synchronized进行代码块的保护,下面来看一个简单的示例:
public class NotifyDemo {
private static Object lock = new Object();
private static int isRunning = 0;
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(() -> {
synchronized (lock) {
while (isRunning == 0) {
System.out.println("t1 获得了锁,但是状态还没有变过去,t1再等会 ...... ");
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("状态变为1 了,t1开始工作了 ......");
}
}, "t1");
Thread t2 = new Thread(() -> {
while (true) {
synchronized (lock) {
System.out.println("t2 获得了锁 ... 开始执行自己的逻辑");
try {
//模拟执行耗时的业务
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (isRunning == 0) {
isRunning = 1;
lock.notifyAll();
break;
}
}
}
}, "t2");
t1.start();
t2.start();
}
}
本例的代码模拟的是,第一个线程要去获取一个全局变量,当这个变量的值不为1的时候就阻塞并一直等待,第二线程去修改这个变量值,修改完毕后,通过notifyAll方法唤醒等待中的线程1
使用wait / notify有几点注意的:
- 在wait的线程中采用while循环的方式,使用if的方式在某些场景下也是可以的, if + wait 判断仅有一次机会,一旦条件不成立,就没有重新判断的机会了安全起见,安全起见,使用while的形式更好
- notify可以唤醒某个等待中的线程,但notify 只能随机唤醒一个 WaitSet (在前面提到过)中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,称之为【虚假唤醒】,为了解决这个问题,建议使用notifyAll
- 调用了wait之后,当前的线程会让出cpu时间片,即在被唤醒之前的时间放弃对锁资源的占用,否则notify的线程就无法获取到锁了,而我们的熟知的Thread.sleep正好相反
wait / notify 比较常用的场景就是在生产者消费者模式中,下面通过一个简单的例子演示一下:
producer代码:
public class Producer implements Runnable {
private final List<Integer> container;
public Producer(List<Integer> container) {
this.container = container;
}
private void produce() throws InterruptedException {
int capacity = 5;
synchronized (container) {
while (container.size() == capacity) {
System.out.println("...容器已满,暂停生产...");
container.wait();
}
Random random = new Random();
int p = random.nextInt(50);
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println("生产的产品编号为:" + p);
container.add(p);
container.notifyAll();
}
}
@Override
public void run() {
while (true) {
try {
produce();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
consumer代码:
public class Consumer implements Runnable {
private final List<Integer> container;
public Consumer(List<Integer> container) {
this.container = container;
}
private void consume() throws InterruptedException {
synchronized (container) {
while (container.isEmpty()) {
System.out.println("...容器已空,暂停消费...");
container.wait();
}
Integer p = container.remove(0);
TimeUnit.MILLISECONDS.sleep(1000);
System.out.println("消费的产品编号为:" + p);
container.notifyAll();
}
}
@Override
public void run() {
while (true) {
try {
consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
代码测试:
public static void main(String[] args) {
List<Integer> container = new ArrayList<>();
Thread producer = new Thread(new Producer(container));
Thread consumer = new Thread(new Consumer(container));
producer.start();
consumer.start();
}
启动观察控制台输出结果,为了演示出效果,上面我们使用了短暂的休眠,真正使用的时候速度是非常快的,同时在真实的应用中,消费者线程往往存在多个,即多个消费者线程共同消费容器里面的数据
4、Park & Unpark 实现方式
它们是 LockSupport 类中的方法,park方法是将当前调用Thread阻塞,而unpark则是将指定线程Thread唤醒,
- LockSupport.park(); 暂停当前线程
- LockSupport.unpark(暂停线程对象) 恢复某个线程的运行
与Object类的wait/notify机制相比,park/unpark有两个优点:
- 以thread为操作对象更符合阻塞线程的直观定义
- 操作更精准,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll唤醒所有等待的线程),增加了灵活性
- park/unpark的设计原理核心是“许可”:park是等待一个许可,unpark是为某线程提供一个许可。如果某线程A调用park,那么除非另外一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。
简单来说,就是当我们准确知道2个线程的交互场景时候,使用park / unpark是非常适合的,他们的配合很灵活,无需像wait/notify那样需要synchronized进行保护,同时也无需关注两个线程的启动顺序,下面来看一个简单的示意代码:
public class ParkDemo {
private static int resourceCount = 0;
static Thread t1;
static Thread t2;
public static void main(String[] args) {
t1 = new Thread(() -> {
while (resourceCount == 0) {
System.out.println("t1获取到的资源是: " + resourceCount);
resourceCount = 1;
LockSupport.park();
LockSupport.unpark(t2);
break;
}
}, "t1");
t2 = new Thread(() -> {
while (resourceCount == 0) {
LockSupport.unpark(t1);
System.out.println("t2获取到的资源是: " + resourceCount);
break;
}
}, "t2");
t1.start();
t2.start();
}
}
我们发现,不管是先调用t1还是t2的start()方法,最终的结果都一样
5、lock 的方式
JDK中lock是一种更灵活的控制线程执行动作的锁,相比synchronized不仅加锁解锁方便,而且提供了更丰富的API操作,可以很方便的对代码块进行各种加解锁操作,但说到线程通讯,这里我们要说一下lock锁的一个具体实现ReentrantLock,在该对象中提供了一个叫做一个Condition用于控制加锁条件,可以用它在多线程中实现线程通讯
我们可以设想,lock是一把锁,可以用这把锁创建多个condition对象,简单理解为每个condition就是一个等待的房间,不同的线程到来了以后可以根据分类归于不同的condition中去,后续等到条件满足的时候,依次唤醒各个condition中的等待线程即可
public class LockTestDemo {
private static int resourceNum = 1;
Lock lock = new ReentrantLock();
Condition conditionFull = lock.newCondition();
Condition conditionEmpty = lock.newCondition();
List<Integer> containers = new ArrayList<>();
public static void main(String[] args) {
LockTestDemo demo = new LockTestDemo();
new Thread(()->{
demo.put();
},"t1").start();
new Thread(()->{
demo.get();
},"t2").start();
}
public void put(){
lock.lock();
try {
while(containers.size()>=5){
try {
conditionEmpty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int num = new Random().nextInt(10);
containers.add(num);
System.out.println("生产者往容器里面放了一个,通知消费者消费 :" + num);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
conditionFull.signal();
}finally {
lock.unlock();
}
}
public Integer get(){
lock.lock();
try {
while(containers.size()==0){
try {
conditionFull.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
conditionEmpty.signal();
Integer remove = containers.remove(0);
System.out.println("消费者从容器里面取走一个,通知生产者生产 :" + remove);
return remove;
}finally {
lock.unlock();
}
}
}
关于Lock锁的更多和深入的用法,有兴趣的同学可以继续研究,本篇暂不做过多的说明
本篇主要讲解了一下关于JDK中实现线程通讯的几种方式,有兴趣的可以对每种方式的实现原理做更深入的研究,最后感谢观看!
上一篇: 动态代理的几种实现方式及优缺点