欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java线程间通信

程序员文章站 2022-06-07 13:19:27
...

Java线程间通信

线程间通信又称为进程内通信,多个线程实现互斥访问共享资源时会互相发送信号或者等待信号。

一、同步阻塞和异步非阻塞

同步阻塞

指的是客户端和服务器通信的时候,每向服务器请求一个事件,如果没有得到想要的结果,就进入阻塞状态,相应地,服务器要创建多个线程来一一处理每个请求,请求完成之后,返回给服务端结果,完成一次交互。

缺点:

  • 同步事务提交,客户端等待时间过长。
  • 系统吞吐量不高。
  • 频繁创建开启或者销毁线程,增加系统额外开销。
  • 在业务达到峰值的时候,大量的业务处理线程阻塞会导致频繁的CPU上下文切换,降低系统性能。

异步非阻塞

客户端提交一个事务以后,会得到一个相应的工单并立即返回事务会被放在消息队列中。服务器有若干个工作线程,不断地从消息队列中获取任务并进行异步处理,最后将处理结果保存至另外一个结果集中,如果客户想要获得处理结果,可凭借工单号再次查询。

优点:

  • 提高了系统的吞吐量和并发量
  • 服务端的线程数量在一个可控的范围之内就不会导致太多的上下文切换而带来额外的开销
  • 服务器端的线程可以重复利用,避免了资源浪费。

二、wait和notify

wait方法和notify方法都是Object中的方法,也就是说在JDK中的每一个类都有这两个方法。

wait

  • Object的wait(long timeout)方法会使当前线程进入阻塞,直到有其他线程调用了Object的notify或者notifyAll方法才能将其唤醒,或者阻塞时间到达timeout自动唤醒。
  • 使用wait方法必须拥有该对象的monitor,即必须在同步方法中使用。
  • 当前线程执行了该对象的wait方法后,会放弃monitor的所有权,并进入与该对象相关联的waitset当中,其他线程可以争夺此对象的monitor所有权。

notify

唤醒单个正在执行该对象wait方法的线程。

如果有某个线程由于执行该对象的wait方法进入阻塞则会被唤醒,如果没有则会忽略。

被唤醒的线程需要重新获取对该对象所关联monitor的lock才能继续执行。

notifyAll

notifyAll可以同时唤醒waitset中的全部阻塞线程,被唤醒的线程仍需要继续争抢monitor。

注意事项

  • wait是可中断的方法,其他线程的interrupt方法可以将其打断。
  • 每个对象的monitor都有一个与之关联的waitset。
  • 当线程加入waitset后,notify方法可以将其唤醒,中断wait中的线程也会将其唤醒。
  • 必须在同步方法中使用wait和notify方法,因为使用的前提是此线程获得了此对象monitor的所有权。
  • 同步代码的monitor必须与执行wait、notify方法对象一致。

wait和sleep方法比较

  • 都可以使线程进入阻塞状态。
  • 都是可中断方法。被中断后都会受到中断异常。
  • wait是Object的方法,sleep是Thread的方法。
  • wait必须在同步方法中进行,而sleep不需要。
  • 线程在执行sleep时,无需释放monitor,而wait需要释放。
  • sleep休眠后会主动退出阻塞,而wait方法需要被其他线程中断才退出阻塞。

三、生产者消费者例子

public class MultiThreadEventQueue {

    private final int max;

    public MultiThreadEventQueue(int max) {
        this.max = max;
    }

    private final static int DEFAULT_MAX_EVENT = 10;

    //如果没有显示地给予队列最大容量,就用默认最大值10.
    public MultiThreadEventQueue() {
        this(DEFAULT_MAX_EVENT);
    }

    //模拟事件类
    static class Event {
    }

    //模拟消息队列
    private final LinkedList<Event> eventQueue = new LinkedList<>();

    public void offer(Event event) {
        synchronized (eventQueue) {
            while (eventQueue.size() >= max) {
                try {
                    System.out.println(Thread.currentThread().getName() + "队列满啦!");
                    eventQueue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            System.out.println(Thread.currentThread().getName() + "新事物已经提交啦");
            //往消息队列中添加事务
            eventQueue.addLast(event);
            //唤醒因等待而阻塞的线程
            eventQueue.notifyAll();
        }
    }

    public Event take() {
        synchronized (eventQueue) {
            while (eventQueue.isEmpty()) {
                try {
                    System.out.println(Thread.currentThread().getName() + "队列为空");
                    eventQueue.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Event event = eventQueue.removeFirst();
            this.eventQueue.notifyAll();
            System.out.println(Thread.currentThread().getName() + "这个事件:" + event + "已经处理完了");
            return event;
        }
    }

}

public class MultiEventQueueClient {

    public static void main(String[] args) {
        final MultiThreadEventQueue eventQueue=new MultiThreadEventQueue();
        new Thread(()->{
            for (;;)
            {
                eventQueue.offer(new MultiThreadEventQueue.Event());
            }
        },"Producer1").start();

        new Thread(()->{
            for (;;)
            {
                eventQueue.take();
                //模拟事务的处理过程
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"Consumer1").start();

        new Thread(()->{
            for (;;)
            {
                eventQueue.offer(new MultiThreadEventQueue.Event());
            }
        },"Producer2").start();

        new Thread(()->{
            for (;;)
            {
                eventQueue.take();
                //模拟事务的处理过程
                try {
                    TimeUnit.MILLISECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"Consumer2").start();

    }
}


输出:

Producer1新事物已经提交啦
Producer1新事物已经提交啦
Producer1新事物已经提交啦
Producer1新事物已经提交啦
Producer1新事物已经提交啦
Producer1新事物已经提交啦
Producer1新事物已经提交啦
Producer1新事物已经提交啦
Producer1新事物已经提交啦
Producer1新事物已经提交啦
Producer1队列满啦!
Consumer1这个事件:Part1_chapter5.MultiThreadEventQueue$Event@7e807af7已经处理完了
Producer1新事物已经提交啦
Producer1队列满啦!
Producer2队列满啦!
Consumer2这个事件:Part1_chapter5.MultiThreadEventQueue$Event@5c30bf90已经处理完了
Producer2新事物已经提交啦
Producer2队列满啦!
Producer1队列满啦!
Consumer2这个事件:Part1_chapter5.MultiThreadEventQueue$Event@52f74a66已经处理完了
Producer1新事物已经提交啦
Producer1队列满啦!
Producer2队列满啦!
Consumer2这个事件:Part1_chapter5.MultiThreadEventQueue$Event@56c8d4e0已经处理完了
Producer2新事物已经提交啦
Producer2队列满啦!
Producer1队列满啦!
Consumer2这个事件:Part1_chapter5.MultiThreadEventQueue$Event@13ca405d已经处理完了
Producer1新事物已经提交啦
Producer1队列满啦!
Producer2队列满啦!
Consumer2这个事件:Part1_chapter5.MultiThreadEventQueue$Event@4adfa18b已经处理完了
Producer2新事物已经提交啦
Producer2队列满啦!
Producer1队列满啦!
Consumer2这个事件:Part1_chapter5.MultiThreadEventQueue$Event@221376cc已经处理完了
Producer1新事物已经提交啦
Producer1队列满啦!
Producer2队列满啦!
Consumer2这个事件:Part1_chapter5.MultiThreadEventQueue$Event@32ecc9ba已经处理完了
Producer2新事物已经提交啦
Producer2队列满啦!
Producer1队列满啦!
Consumer2这个事件:Part1_chapter5.MultiThreadEventQueue$Event@73e5bf12已经处理完了
Producer1新事物已经提交啦
Producer1队列满啦!
Producer2队列满啦!
Consumer2这个事件:Part1_chapter5.MultiThreadEventQueue$Event@44819b1c已经处理完了

相关标签: java并发与多线程