欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java实现线程通信的几种方式

程序员文章站 2022-06-17 19:53:12
...

前言

在多线程的世界里,线程与线程之间的交互无处不在,只不过在平时的开发过程中,大多数情况下,我们都在单线程的模式下进行编码,即使有,也直接借助框架自身的机制实现了,其实线程之间的通信在JDK中是一个比较深的问题,比如大家熟知的消息中间件的实现,从某种角度上讲,就借助了多线程通信的思想,下面总结了JDK中常用的几种实现线程通信的方式,提供参考

1、synchronized实现方式

可能很多小伙伴们会有疑问,synchronized是对共享资源加锁使用的,怎么和线程通信扯在一起呢?这里纠正一个小小的偏见,也是我近期才矫正过来的

我们要弄明白的一点是,为什么会存在线程通讯这个问题呢?根据一些技术大牛们的说法就是,多个线程之间需要相互传递一些参数、变量或者是各个线程的执行需要互相依赖各自的结果,比如我们熟知的生产者消费者模式,只有生产者生产出来了东西,消费者才能进行消费啊

这里模拟假如有2个线程,需要操作一个共享资源,即修改共享资源的数据,使用synchronized的方式如下:

public class SycDemo1 {

    private static Object lock = new Object();

    private static String weather = "sunny";

    public static void main(String[] args) {

        new Thread(()->{
            synchronized (lock){
                System.out.println("t1 before change weather is :" + weather);
                weather = "raing";
                System.out.println("t1 after change weather is :" + weather);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"t1").start();

        new Thread(()->{
            synchronized (lock){
                System.out.println("t2 before change weather is :" + weather);
                weather = "snow";
                System.out.println("t2 after change weather is :" + weather);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"t2").start();

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            Thread.sleep(2000);
            System.out.println("main thread weather is:" +weather);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

}

代码比较简单,2个线程去修改一个weather 的共享变量,由于线程启动不依赖代码的顺序,任何一个获得了lock锁的线程就会先执行对weather的修改,从控制台的打印输出可以看到,可以满足我们的需求,这里也可以说明,synchronized属于独占锁或者排它锁,即某个时刻只有一个线程可以持有这把锁
java实现线程通信的几种方式

既然说到这里,我们就对synchronized的锁原理做一点深入的说明,仅提供参考。

在多线程访问共享资源的时候,经常会带来可见性和原子性的安全问题。为了解决这类线程安全的问题,java提供了同步机制、互斥机制,这个机制保证了在同一时刻只有一个线程能访问共享资源。这个机制来源于监视锁Monitor,每个对象都拥有自己的监视锁Monitor,比如上面的案例中的lock,就是Object对象的一个锁,那么说在被synchronized修饰了之后,这个lock上面就存在了一个Monitor,这个Monitor的作用是什么呢?

我们设想,监视器就是在某一时刻有一个小房子,在这个小房子里面放置了一些资源,这里就可以理解为weather共享的变量,这时候,有很多人(多个线程)需要访问这个资源,但是Monitor上面有一把锁,多个线程只有一个人能够抢占成功并访问资源,二其余的则会进行等待,这里为阻塞,从Monitor的数据结构上说,就是将其他线程加入到一个waitSet中,直到持有锁的线程释放了lock。

但这里的问题是,这个waitSet并非像是queue那样具有顺序性,一旦某个线程的锁释放后,其他线程争抢的时候依然是随机的,即不能保证公平性,因此synchronized锁是非公平锁。

下面借助一张简图方便理解synchronized的锁原理
java实现线程通信的几种方式

2、join() 实现方式

thread.Join把指定的线程加入到当前线程,可以将两个交替执行的线程合并为顺序执行的线程。

比如在线程B中调用了线程A的Join()方法,直到线程A执行完毕后,才会继续执行线程B。

我们来看下面的例子:

public class JoinDemo {

    private static int counter = 1;
    private static int result = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(() -> {
            counter = 2;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t1");

        t1.start();
        System.out.println("等待线程1执行完毕结果....");
        t1.join();

        Thread t2 = new Thread(() -> {
            counter = 2;
            result = 100 + counter;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "t2");
        t2.start();
        t2.join();
        //主线程打印结果
        System.out.println("主线程获取到了最终的执行结果:" + result);
    }
}

通过控制台的打印输出结果顺序,我们可以看到,join的作用就像是起到了一个线程暂停的作用,某个线程要得到另一个线程的结果之前被阻塞了一样

其实,join方法是通过调用线程的wait方法来达到同步的目的的。例如,A线程中调用了B线程的join方法,则相当于A线程调用了B线程的wait方法,在调用了B线程的wait方法后,A线程就会进入阻塞状态
java实现线程通信的几种方式

3、wait notify 实现方式

wait() notify()应该是大家比较熟悉的进行线程通讯的方式了,常见于生产者、消费者模式中使用,他们属于Object的方法,一般需要配合起来使用,即有wait()就有notify(),或者notifyall(),同时在使用wait notify的时候通常是要使用synchronized进行代码块的保护,下面来看一个简单的示例:

public class NotifyDemo {

    private static Object lock = new Object();
    private static int isRunning = 0;

    public static void main(String[] args) throws InterruptedException {

        Thread t1 = new Thread(() -> {
            synchronized (lock) {
                while (isRunning == 0) {
                    System.out.println("t1 获得了锁,但是状态还没有变过去,t1再等会 ...... ");
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("状态变为1 了,t1开始工作了 ......");
            }
        }, "t1");

        Thread t2 = new Thread(() -> {
            while (true) {
                synchronized (lock) {
                    System.out.println("t2 获得了锁 ... 开始执行自己的逻辑");
                    try {
                        //模拟执行耗时的业务
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (isRunning == 0) {
                        isRunning = 1;
                        lock.notifyAll();
                        break;
                    }
                }
            }
        }, "t2");

        t1.start();
        t2.start();
    }

}

本例的代码模拟的是,第一个线程要去获取一个全局变量,当这个变量的值不为1的时候就阻塞并一直等待,第二线程去修改这个变量值,修改完毕后,通过notifyAll方法唤醒等待中的线程1
java实现线程通信的几种方式
使用wait / notify有几点注意的:

  • 在wait的线程中采用while循环的方式,使用if的方式在某些场景下也是可以的, if + wait 判断仅有一次机会,一旦条件不成立,就没有重新判断的机会了安全起见,安全起见,使用while的形式更好
  • notify可以唤醒某个等待中的线程,但notify 只能随机唤醒一个 WaitSet (在前面提到过)中的线程,这时如果有其它线程也在等待,那么就可能唤醒不了正确的线程,称之为【虚假唤醒】,为了解决这个问题,建议使用notifyAll
  • 调用了wait之后,当前的线程会让出cpu时间片,即在被唤醒之前的时间放弃对锁资源的占用,否则notify的线程就无法获取到锁了,而我们的熟知的Thread.sleep正好相反

wait / notify 比较常用的场景就是在生产者消费者模式中,下面通过一个简单的例子演示一下:

producer代码:

public class Producer implements Runnable {

    private final List<Integer> container;

    public Producer(List<Integer> container) {
        this.container = container;
    }

    private void produce() throws InterruptedException {
        int capacity = 5;
        synchronized (container) {
            while (container.size() == capacity) {
                System.out.println("...容器已满,暂停生产...");
                container.wait();
            }
            Random random = new Random();
            int p = random.nextInt(50);
            TimeUnit.MILLISECONDS.sleep(1000);
            System.out.println("生产的产品编号为:" + p);
            container.add(p);
            container.notifyAll();
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                produce();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

consumer代码:

public class Consumer implements Runnable {

    private final List<Integer> container;

    public Consumer(List<Integer> container) {
        this.container = container;
    }
    
    private void consume() throws InterruptedException {
        synchronized (container) {
            while (container.isEmpty()) {
                System.out.println("...容器已空,暂停消费...");
                container.wait();
            }
            Integer p = container.remove(0);
            TimeUnit.MILLISECONDS.sleep(1000);
            System.out.println("消费的产品编号为:" + p);
            container.notifyAll();
        }
    }

    @Override
    public void run() {
        while (true) {
            try {
                consume();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

代码测试:

public static void main(String[] args) {
        List<Integer> container = new ArrayList<>();
        Thread producer = new Thread(new Producer(container));
        Thread consumer = new Thread(new Consumer(container));
        producer.start();
        consumer.start();
      }

启动观察控制台输出结果,为了演示出效果,上面我们使用了短暂的休眠,真正使用的时候速度是非常快的,同时在真实的应用中,消费者线程往往存在多个,即多个消费者线程共同消费容器里面的数据
java实现线程通信的几种方式

4、Park & Unpark 实现方式

它们是 LockSupport 类中的方法,park方法是将当前调用Thread阻塞,而unpark则是将指定线程Thread唤醒,

  • LockSupport.park(); 暂停当前线程
  • LockSupport.unpark(暂停线程对象) 恢复某个线程的运行

与Object类的wait/notify机制相比,park/unpark有两个优点:

  1. 以thread为操作对象更符合阻塞线程的直观定义
  2. 操作更精准,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll唤醒所有等待的线程),增加了灵活性
  3. park/unpark的设计原理核心是“许可”:park是等待一个许可,unpark是为某线程提供一个许可。如果某线程A调用park,那么除非另外一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。

简单来说,就是当我们准确知道2个线程的交互场景时候,使用park / unpark是非常适合的,他们的配合很灵活,无需像wait/notify那样需要synchronized进行保护,同时也无需关注两个线程的启动顺序,下面来看一个简单的示意代码:

public class ParkDemo {

    private static int resourceCount = 0;
    static Thread t1;
    static Thread t2;

    public static void main(String[] args) {
        t1 = new Thread(() -> {
            while (resourceCount == 0) {
                System.out.println("t1获取到的资源是: " + resourceCount);
                resourceCount = 1;
                LockSupport.park();
                LockSupport.unpark(t2);
                break;
            }
        }, "t1");

        t2 = new Thread(() -> {
            while (resourceCount == 0) {
                LockSupport.unpark(t1);
                System.out.println("t2获取到的资源是: " + resourceCount);
                break;
            }
        }, "t2");
        t1.start();
        t2.start();
    }

}

我们发现,不管是先调用t1还是t2的start()方法,最终的结果都一样

java实现线程通信的几种方式
5、lock 的方式

JDK中lock是一种更灵活的控制线程执行动作的锁,相比synchronized不仅加锁解锁方便,而且提供了更丰富的API操作,可以很方便的对代码块进行各种加解锁操作,但说到线程通讯,这里我们要说一下lock锁的一个具体实现ReentrantLock,在该对象中提供了一个叫做一个Condition用于控制加锁条件,可以用它在多线程中实现线程通讯

我们可以设想,lock是一把锁,可以用这把锁创建多个condition对象,简单理解为每个condition就是一个等待的房间,不同的线程到来了以后可以根据分类归于不同的condition中去,后续等到条件满足的时候,依次唤醒各个condition中的等待线程即可

public class LockTestDemo {

    private static int resourceNum = 1;

    Lock lock = new ReentrantLock();
    Condition conditionFull = lock.newCondition();
    Condition conditionEmpty = lock.newCondition();
    List<Integer> containers = new ArrayList<>();

    public static void main(String[] args) {
        LockTestDemo demo = new LockTestDemo();
        new Thread(()->{
            demo.put();
        },"t1").start();

        new Thread(()->{
            demo.get();
        },"t2").start();

    }

    public void put(){
        lock.lock();
        try {
            while(containers.size()>=5){
                try {
                    conditionEmpty.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            int num = new Random().nextInt(10);
            containers.add(num);
            System.out.println("生产者往容器里面放了一个,通知消费者消费 :" + num);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            conditionFull.signal();
        }finally {
            lock.unlock();
        }
    }

    public Integer get(){
        lock.lock();
        try {
            while(containers.size()==0){
                try {
                    conditionFull.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            conditionEmpty.signal();
            Integer remove = containers.remove(0);
            System.out.println("消费者从容器里面取走一个,通知生产者生产 :" + remove);
            return remove;
        }finally {
            lock.unlock();
        }
    }

}

关于Lock锁的更多和深入的用法,有兴趣的同学可以继续研究,本篇暂不做过多的说明
java实现线程通信的几种方式

本篇主要讲解了一下关于JDK中实现线程通讯的几种方式,有兴趣的可以对每种方式的实现原理做更深入的研究,最后感谢观看!