欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

JDK1.8并发之生产者消费者问题

程序员文章站 2022-05-04 20:59:42
...

本文讨论了生产者消费者模式的三种实现方式,第三种BlockingQueue实际上就是JDK对第二种方式的封装。为了提高程序的拓展性,方便测试,我用StockRoom接口来抽象产品库存的put和take操作。这样三种实现方式可以使用相同的Producer和Consumer类代码。这些可复用的代码如下:

// 测试方法:
private static void test(StockRoom stockRoom) {
    new Thread(new Consumer("销售小高", stockRoom)).start();
    new Thread(new Consumer("销售小田", stockRoom)).start();
    new Thread(new Producer("王工", stockRoom)).start();
    new Thread(new Producer("吴工", stockRoom)).start();
}

// 产品库操作的接口
interface StockRoom {
    void put(String operator, Object object) throws InterruptedException;

    Object take(String operator) throws InterruptedException;
}

// 生产者
class Producer implements Runnable {
    StockRoom stockRoom;
    String name;

    public Producer(String name, StockRoom stockRoom) {
        this.name = name;
        this.stockRoom = stockRoom;
    }

    @Override
    public void run() {
        while (true) {
            try {
                stockRoom.put(name, new Object());
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println("销售员睡觉去了");
            }
        }
    }
}

// 消费者
class Consumer implements Runnable {
    StockRoom stockRoom;
    String name;

    public Consumer(String name, StockRoom stockRoom) {
        this.name = name;
        this.stockRoom = stockRoom;
    }

    @Override
    public void run() {
        while (true) {
            try {
                stockRoom.take(name);
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println("建筑工程师看电影去了");
            }
        }
    }
}


1. synchronized和wait、notifyAll实现

class StockRoomSyn implements StockRoom {
    private static final int MAX_STOCK_NUM = 10;

    private int N = 0;
    private Object[] stocks = new Object[MAX_STOCK_NUM];

    public synchronized void put(String operator, Object o) throws InterruptedException {
        Random random = new Random();

        while (N == MAX_STOCK_NUM) {
            System.out.println("库存太多了,停止生产@" + operator);
            wait();
        }

        Thread.sleep((random.nextInt(3) + 1) << 10);
        stocks[N++] = o;
        System.out.println(operator + ":又建好一件房子,现在的库存是:" + N);
        notifyAll();
    }

    public synchronized Object take(String operator) throws InterruptedException {
        Random random = new Random();

        while (N == 0) {
            System.out.println("诶呀,房子都卖完了@" + operator);
            wait();
        }

        Thread.sleep((random.nextInt(3) + 1) << 10);
        final Object o = stocks[--N];
        String msg = operator + ":经理,爸爸又卖出一套房子,现在的库存是:" + (--N);
        System.out.println(msg);
        notifyAll();
        return o;
    }
}

public static void test_syn() {
    test(new StockRoomSyn());
}
/*
    诶呀,房子都卖完了@销售小田
    诶呀,房子都卖完了@销售小高
    王工:又建好一件房子,现在的库存是:1
    销售小田:经理,爸爸又卖出一套房子,现在的库存是:0
    诶呀,房子都卖完了@销售小高
    王工:又建好一件房子,现在的库存是:1
    吴工:又建好一件房子,现在的库存是:2
    王工:又建好一件房子,现在的库存是:3
    销售小高:经理,爸爸又卖出一套房子,现在的库存是:2
    销售小田:经理,爸爸又卖出一套房子,现在的库存是:1
    销售小高:经理,爸爸又卖出一套房子,现在的库存是:0
    王工:又建好一件房子,现在的库存是:1
    吴工:又建好一件房子,现在的库存是:2
    王工:又建好一件房子,现在的库存是:3
    ...
*/

多个线程put和take的时候,使用notify容易出现死锁:

比如现在有1个take线程和1个put线程因为wait方法被要求进入等待队列,如果一个put线程正占据着锁,它用完之后notify,唤醒的如果是另一个put线程,而这个put线程拿到锁后,发现put条件不满足,进入wait方法。因为此时没有线程在锁池中,所以锁就空了。等待队列中的take和put线程都没有人来唤醒它们,进入了死锁状态。

JDK1.8并发之生产者消费者问题

JDK1.8并发之生产者消费者问题

当使用notifyAll的时候,所有等待池的线程都进入锁池,这样即使put线程因为wait方法进入了等待池,其他的线程还可以竞争锁,使得等待池有可能被唤醒。当然notifyAll并不能解决所有的死锁问题,只是比notify更不容易出错。

这体现了这种模式的缺点,就是notify或notifyAll唤醒的是所有等待锁的线程,既包括put又包括take。而实际应用中,应该区别对待,即put方法在执行结束前唤醒take等待的消费者线程,take方法在执行结束前唤醒put等待的生产者线程。这就是下面要介绍的第二种方式的优点之一。


2. Lock锁和Condition实现

class StockRoomLock implements StockRoom {

    private static final int MAX_STOCK_NUM = 10;

    private Object[] stocks = new Object[MAX_STOCK_NUM];
    private int N = 0;
    private Lock lock;
    private Condition notFull, notEmpty;

    public StockRoomLock() {
        lock = new ReentrantLock();
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }

    @Override
    public void put(String operator, Object o) throws InterruptedException {
        lock.lock();
        try {
            Random random = new Random();
            while (N == MAX_STOCK_NUM) {
                System.out.println("库存太多了,停止生产@" + operator);
                notFull.await();
            }
            Thread.sleep((random.nextInt(3) + 1) << 10);
            stocks[N++] = o;
            System.out.println(operator + ":又建好一件房子,现在的库存是:" + N);
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    @Override
    public Object take(String operator) throws InterruptedException {
        lock.lock();
        try {
            Random random = new Random();
            while (N == 0) {
                System.out.println("诶呀,房子都卖完了@" + operator);
                notEmpty.await();
            }
            Thread.sleep((random.nextInt(3) + 1) << 10);
            final Object o = stocks[--N];
            System.out.println(operator + ":经理,爸爸又卖出一套房子,现在的库存是:" + N);
            notFull.signal();
            return o;
        } finally {
            lock.unlock();
        }
    }
}

public static void test_lock() {
    test(new StockRoomLock());
}
/*
    诶呀,房子都卖完了@销售小高
    王工:又建好一件房子,现在的库存是:1
    销售小田:经理,爸爸又卖出一套房子,现在的库存是:0
    吴工:又建好一件房子,现在的库存是:1
    销售小高:经理,爸爸又卖出一套房子,现在的库存是:0
    王工:又建好一件房子,现在的库存是:1
    销售小田:经理,爸爸又卖出一套房子,现在的库存是:0
    吴工:又建好一件房子,现在的库存是:1
    销售小高:经理,爸爸又卖出一套房子,现在的库存是:0
    王工:又建好一件房子,现在的库存是:1
    销售小田:经理,爸爸又卖出一套房子,现在的库存是:0
    吴工:又建好一件房子,现在的库存是:1
    销售小高:经理,爸爸又卖出一套房子,现在的库存是:0
    ...
*/


3. BlockingQueue实现

BlockingQueue就像是一个封装好的StockRoom对象,直接使用它的put和take方法就行了,这里的代码只是为了方便大家比较:

class StockRoomBQueue implements StockRoom {

    private static final int MAX_STOCK_NUM = 10;

    private BlockingQueue<Object> queue = new ArrayBlockingQueue<>(MAX_STOCK_NUM);

    @Override
    public void put(String operator, Object o) throws InterruptedException {
        queue.put(o);
    }

    @Override
    public Object take(String operator) throws InterruptedException {
        return queue.take();
    }
}

public static void test_queue() {
    test(new StockRoomBQueue());
}

ArrayBlockingQueue的源码就是用条件变量和Lock实现的:

构造器:

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

相关字段:

/** The queued items */
final Object[] items;

/** Number of elements in the queue */
int count;

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

put方法:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

private void enqueue(E x) {
    // assert lock.getHoldCount() == 1;
    // assert items[putIndex] == null;
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

take方法:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

private E dequeue() {
    // assert lock.getHoldCount() == 1;
    // assert items[takeIndex] != null;
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

java.util.concurrent包中的BlockingQueue是高效且线程安全的队列接口,有五个具体的实现类(详细见参考资料2):

  • ArrayBlockingQueue:由定长数组支持的有界拥塞队列。FIFO。
  • LinkedBlockingQueue:*(如果构造时指定大小就有界)的拥塞队列,单链表结构。FIFO。
  • PriorityBlockingQueue:*,平衡二叉堆的数组结构,保存的对象必须实现Comparable接口,按此比较规则来排序。迭代器的输出是二叉堆数组顺序,无法保证排序。
  • SynchronousQueue:没有内部容量,插入和取出需要成对使用,一个插入一个元素后还没结束就会阻塞,需要等待另一个线程取元素后,它们才一起结束。
  • DelayQueue:*,用优先队列实现,元素必须实现java.util.concurrent.Delayed接口,take方法会阻塞到第一个到期的内部元素返回,通常根据元素的过期时刻来排序。


参考资料推荐

  1. 用Java实现生产者消费者的几种方法
  2. 【Java并发之】BlockingQueue


上一篇:JDK1.8并发之synchronized和Lock
下一篇:JDK1.8结合框架之Set

相关标签: 并发