欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

3.Java数据结构原理解析-Queue系列

程序员文章站 2022-06-08 08:10:30
...

Queue,也就是队列,满足FIFO的特性。
在Java中,Queue是一个接口,它的实现类有很多,其中非线程安全的代表是LinkedList,线程安全的有阻塞和非阻塞的,阻塞的大都实现了Queue的子接口BlockingQueue(阻塞队列),例如:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue等。非阻塞的有ConcurrentLinkedQueue。

Queue接口方法定义:

//添加元素,成功返回true,容量不够抛IllegalStateException
boolean add(E e)
//添加元素,成功返回true,容量不足返回false
boolean offer(E e)

//移除队首元素,队列为空时抛NoSuchElementException
E remove()
//移除队首元素,队列为空时返回null
E poll()

//查看队首元素,队列为空时抛NoSuchElementException
E element()
//查看队首元素,队列为空时返回null
E peek()

BlockingQueue接口定义(BlockingQueue除了继承Queue定义的方法外,还加入了自己的阻塞方法):

//添加元素,容量不足阻塞
void put(E e) throws InterruptedException
//添加元素,容量不足时等待指定时间
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException

//移除队首元素,队列为空时阻塞
E take() throws InterruptedException
//移除队首元素,队列为空时等待指定时间
E poll(long timeout, TimeUnit unit) throws InterruptedException

队列大多数是在多线程环境下使用的,生产者线程往队列中添加元素,消费者线程从队列中取出元素。所以,下面重点讨论ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue是采用什么样的数据结构和算法来保证队列的线程安全性的。

1.阻塞队列 ArrayBlockingQueue

ArrayBlockingQueue底层的数据结构是数组和循环队列,使用一个可重入锁和这个锁的两个条件对象进行并发控制。
首先,来看看ArrayBlockingQueue的属性。

//存放元素的数组
final Object[] items;
//循环队列头指针,起始值为0
int takeIndex;
//循环队列尾指针,指向下一个元素插入的位置,起始值为0
int putIndex;
//元素的个数
int count;

//可重入锁(被final修饰,之所以没有初始化,是因为所有的构造方法里面都对lock进行了初始化)
final ReentrantLock lock;
//队列非空条件
private final Condition notEmpty;
//队列未满条件
private final Condition notFull;

ArrayBlockingQueue的长度是固定的,无法扩容,所以创建一个ArrayBlockingQueue对象时,必须指定队列的容量,并且ArrayBlockingQueue不允许原始为null。从构造函数上可以看出这一点。

//创建一个指定容量的队列,锁默认是非公平的
public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}
//创建一个指定容量、指定锁的公平性的队列
public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    //创建锁
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

//用现有的集合创建一个指定容量、指定锁的公平性的队列
public ArrayBlockingQueue(int capacity, boolean fair,
                          Collection<? extends E> c) {
    //创建队列
    this(capacity, fair);

    final ReentrantLock lock = this.lock;
    lock.lock(); // Lock only for visibility, not mutual exclusion(锁定只用于可见性,而不是互斥)
    try {
        int i = 0;
        try {
            for (E e : c) {
                checkNotNull(e);
                items[i++] = e;
            }
        } catch (ArrayIndexOutOfBoundsException ex) {
            throw new IllegalArgumentException();
        }
        count = i;
        //尾指针
        putIndex = (i == capacity) ? 0 : i;
    } finally {
        lock.unlock();
    }
}

(1)插入元素add、offer、put

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        if (count == items.length)
            return false;
        else {
            insert(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}

private void insert(E x) {
    items[putIndex] = x;
    //队列尾指针+1
    putIndex = inc(putIndex);
    ++count;
    //通知在notEmpty上等待的线程
    notEmpty.signal();
}

//循环加。循环队列的实现就体现在这里
final int inc(int i) {
    return (++i == items.length) ? 0 : i;
}

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    //在锁上等待,直到获取锁,但是会响应中断,优先考虑响应中断,而不是响应锁的普通获取或重入获取。
    //不明白为什么add和offer方法使用lock,而put方法使用lockInterruptibly?
    lock.lockInterruptibly();
    try {
        //队列已满,在notFull对象上等待
        while (count == items.length)
            notFull.await();
        insert(e);
    } finally {
        lock.unlock();
    }
}

(2)取出元素remove、poll、take

public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

public E poll() {
   final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //队列为空时返回null
        return (count == 0) ? null : extract();
    } finally {
        lock.unlock();
    }
}

private E extract() {
    final Object[] items = this.items;
    E x = this.<E>cast(items[takeIndex]);
    items[takeIndex] = null;
    //队列头指针+1
    takeIndex = inc(takeIndex);
    --count;
    //通知在notFull对象上等待的线程
    notFull.signal();
    return x;
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        //队列为空时,在notEmpty上等待
        while (count == 0)
            notEmpty.await();
        return extract();
    } finally {
        lock.unlock();
    }
}

2.阻塞队列 LinkedBlockingQueueue

LinkedBlockingQueueue底层的数据结构是单向链表,使用两个可重入锁(放锁和拿锁)和对象的条件对象来进行并发控制。
LinkedBlockingQueueue由于使用了两个锁,所以允许同时添加和取出元素。这一点是和ArrayBlockingQueue最大的区别。

一个类的属性体现了这个类的数据结构,我们首先看看LinkedBlockingQueueue的属性

//链表的节点。从节点可以看出该链表只有一个next指针,是单向的,
static class Node<E> {
    E item;

    Node<E> next;

    Node(E x) { item = x; }
}

//队列的容量,定义为final,说明所有的构造方法必须初始化容量
private final int capacity;

//元素的个数,因为使用了放锁和拿锁两个锁,所以同时添加和取出元素时存在并发问题,使用原子操作来保证元素的个数的准确性
private final AtomicInteger count = new AtomicInteger(0);

//单向链表头指针,head.item永远为null。(定义为transient说明不能序列化)
private transient Node<E> head;

//单向链表尾指针,last.next永远为null。(定义为transient说明不能序列化)
private transient Node<E> last;

//拿锁(控制remove、poll、take方法等)
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();

//放锁(控制add、offer、put方法等)
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

照常理来说,取出一个元素后,队列应该是notFull,那么拿锁控制的是应该是notFull的条件变量,但是因为此处存在两把锁,可能在取出元素后,又有元素加入了。所有此处拿锁控制的是notEmpty,取出元素后,只要判断剩下的元素是否大于1就可以了,因为不可能有两个线程同时执行取操作。

(1)插入元素add、offer、put

//add方法是在AbstractQueue实现了,所以跟ArrayBlockingQueue一样
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node(e);
    //锁定放锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //队列未满
        if (count.get() < capacity) {
            enqueue(node);
            //队列长度+1
            c = count.getAndIncrement();
            //插入之后,队列还是未满,通知在notFull对象上的等待的线程(例如:put方法)
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    //c==0表示插入之前队列为空,队列为空说明可能有读线程在阻塞,如果c>0,说明肯定没有读线程在阻塞
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

//signalNotEmpty虽然用在offder/put中,但是从不在putLock的同步区内。这样就保证同一时刻只持有一个锁,这样就不会出现死锁问题。
//???关于此处为什么加锁的问题,暂时就是这样理解的
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

//入队。入队操作很简单,就是将链表尾指针的next节点指向当前节点,并把当前节点设置为尾指针
private void enqueue(Node<E> node) {
    last = last.next = node;
}

(2)取出操作remove、poll、take

//remove()方法是在AbstractQueue中实现了,跟ArrayBlockingQueue一样
public E remove() {
    E x = poll();
    if (x != null)
        return x;
    else
        throw new NoSuchElementException();
}

public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            //出队
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

//出队操作比较简单,就是将单链表的头指针指向下一个元素
private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    //不是很明白这个,如果要帮助GC,直接将h.next=null不是更好吗?
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            //在notEmpty条件上等待
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}    

3.阻塞队列 SynchronousQueue

  SynchronousQueue跟上面两个阻塞队列不同,它内部没有容器,一个生产线程put的时候,如果当前没有消费线程执行take,此生产线程必须阻塞,等待一个消费线程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的数据(即数据传递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。

可以参考:https://zhuanlan.zhihu.com/p/29227508
https://www.jianshu.com/p/376d368cb44f?hmsr=toutiao.io&utm_medium=toutiao.io&utm_source=toutiao.io

4.非阻塞队列 ConcurrentLinkedQueue

ArrayBlockingQueue和LinkedBlockingQueue都是阻塞的,阻塞体现在入队和出队的时候需要加锁。
下面介绍的ConcurrentLinkedQueue是非阻塞的,ConcurrentLinkedQueue底层的数据结构和LinkedBlockingQueue相同,也是使用单链表,不同的是ConcurrentLinkedQueue通过sun.misc.Unsafe类的CAS操作来保证线程安全的。

Unsafe类提供了硬件级别的原子操作,主要compareAndSwapXXX方法实现。
关于Unsafe,网上有很多资源,请自行查阅。

我们首先来看看ConcurrentLinkedQueue的成员变量。

//单链表头节点
private transient volatile Node<E> head;

//单链表尾节点
private transient volatile Node<E> tail;

//节点类型。与LinkedBlockingQueue不同的是,所有的赋值操作都是通过Unsafe对象的CAS来完成的,所以是线程安全的
private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    //为item赋值
    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    //为next指针赋值
    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            //获取item属性和next属性的内存地址
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

(1)入队add、offer
3.Java数据结构原理解析-Queue系列
需要注意的是,每次入队之后,tail并不是总指向最后一个节点。奇数时是倒数第二个节点,偶数时是第一个节点。

public boolean add(E e) {
    return offer(e);
}

public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {
        //获得p的下一个节点
        Node<E> q = p.next;
        //如果下一个节点是null,也就是p节点就是尾节点
        if (q == null) {
            //将单链表的尾节点的next指针指向新节点
            if (p.casNext(null, newNode)) {
                if (p != t)
                     //如果tail不是尾节点则将入队节点设置为tail。
                     // 如果失败了,那么说明有其他线程已经把tail移动过
                    casTail(t, newNode);
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        // 如果p节点等于p的next节点,则说明p节点和q节点都为空,表示队列刚初始化,所以返回
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

//为队列的尾节点赋值
private boolean casTail(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
}

可参考:http://blog.csdn.net/u013991521/article/details/53068549

相关标签: queue