Java concurrency集合之LinkedBlockingDeque_动力节点Java学院整理
linkedblockingdeque介绍
linkedblockingdeque是双向链表实现的双向并发阻塞队列。该阻塞队列同时支持fifo和filo两种操作方式,即可以从队列的头和尾同时操作(插入/删除);并且,该阻塞队列是支持线程安全。
此外,linkedblockingdeque还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于integer.max_value。
linkedblockingdeque原理和数据结构
linkedblockingdeque的数据结构,如下图所示:
说明:
1. linkedblockingdeque继承于abstractqueue,它本质上是一个支持fifo和filo的双向的队列。
2. linkedblockingdeque实现了blockingdeque接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
3. linkedblockingdeque是通过双向链表实现的。
3.1 first是双向链表的表头。
3.2 last是双向链表的表尾。
3.3 count是linkedblockingdeque的实际大小,即双向链表中当前节点个数。
3.4 capacity是linkedblockingdeque的容量,它是在创建linkedblockingdeque时指定的。
3.5 lock是控制对linkedblockingdeque的互斥锁,当多个线程竞争同时访问linkedblockingdeque时,某线程获取到了互斥锁lock,其它线程则需要阻塞等待,直到该线程释放lock,其它线程才有机会获取lock从而获取cpu执行权。
3.6 notempty和notfull分别是“非空条件”和“未满条件”。通过它们能够更加细腻进行并发控制。
linkedblockingdeque函数列表
// 创建一个容量为 integer.max_value 的 linkedblockingdeque。 linkedblockingdeque() // 创建一个容量为 integer.max_value 的 linkedblockingdeque,最初包含给定 collection 的元素,以该 collection 迭代器的遍历顺序添加。 linkedblockingdeque(collection<? extends e> c) // 创建一个具有给定(固定)容量的 linkedblockingdeque。 linkedblockingdeque(int capacity) // 在不违反容量限制的情况下,将指定的元素插入此双端队列的末尾。 boolean add(e e) // 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头;如果当前没有空间可用,则抛出 illegalstateexception。 void addfirst(e e) // 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾;如果当前没有空间可用,则抛出 illegalstateexception。 void addlast(e e) // 以原子方式 (atomically) 从此双端队列移除所有元素。 void clear() // 如果此双端队列包含指定的元素,则返回 true。 boolean contains(object o) // 返回在此双端队列的元素上以逆向连续顺序进行迭代的迭代器。 iterator<e> descendingiterator() // 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。 int drainto(collection<? super e> c) // 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。 int drainto(collection<? super e> c, int maxelements) // 获取但不移除此双端队列表示的队列的头部。 e element() // 获取,但不移除此双端队列的第一个元素。 e getfirst() // 获取,但不移除此双端队列的最后一个元素。 e getlast() // 返回在此双端队列元素上以恰当顺序进行迭代的迭代器。 iterator<e> iterator() // 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),并在成功时返回 true;如果当前没有空间可用,则返回 false。 boolean offer(e e) // 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将在指定的等待时间内一直等待可用空间。 boolean offer(e e, long timeout, timeunit unit) // 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的开头,并在成功时返回 true;如果当前没有空间可用,则返回 false。 boolean offerfirst(e e) // 将指定的元素插入此双端队列的开头,必要时将在指定的等待时间内等待可用空间。 boolean offerfirst(e e, long timeout, timeunit unit) // 如果立即可行且不违反容量限制,则将指定的元素插入此双端队列的末尾,并在成功时返回 true;如果当前没有空间可用,则返回 false。 boolean offerlast(e e) // 将指定的元素插入此双端队列的末尾,必要时将在指定的等待时间内等待可用空间。 boolean offerlast(e e, long timeout, timeunit unit) // 获取但不移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回 null。 e peek() // 获取,但不移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。 e peekfirst() // 获取,但不移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。 e peeklast() // 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素);如果此双端队列为空,则返回 null。 e poll() // 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),如有必要将在指定的等待时间内等待可用元素。 e poll(long timeout, timeunit unit) // 获取并移除此双端队列的第一个元素;如果此双端队列为空,则返回 null。 e pollfirst() // 获取并移除此双端队列的第一个元素,必要时将在指定的等待时间等待可用元素。 e pollfirst(long timeout, timeunit unit) // 获取并移除此双端队列的最后一个元素;如果此双端队列为空,则返回 null。 e polllast() // 获取并移除此双端队列的最后一个元素,必要时将在指定的等待时间内等待可用元素。 e polllast(long timeout, timeunit unit) // 从此双端队列所表示的堆栈中弹出一个元素。 e pop() // 将元素推入此双端队列表示的栈。 void push(e e) // 将指定的元素插入此双端队列表示的队列中(即此双端队列的尾部),必要时将一直等待可用空间。 void put(e e) // 将指定的元素插入此双端队列的开头,必要时将一直等待可用空间。 void putfirst(e e) // 将指定的元素插入此双端队列的末尾,必要时将一直等待可用空间。 void putlast(e e) // 返回理想情况下(没有内存和资源约束)此双端队列可不受阻塞地接受的额外元素数。 int remainingcapacity() // 获取并移除此双端队列表示的队列的头部。 e remove() // 从此双端队列移除第一次出现的指定元素。 boolean remove(object o) // 获取并移除此双端队列第一个元素。 e removefirst() // 从此双端队列移除第一次出现的指定元素。 boolean removefirstoccurrence(object o) // 获取并移除此双端队列的最后一个元素。 e removelast() // 从此双端队列移除最后一次出现的指定元素。 boolean removelastoccurrence(object o) // 返回此双端队列中的元素数。 int size() // 获取并移除此双端队列表示的队列的头部(即此双端队列的第一个元素),必要时将一直等待可用元素。 e take() // 获取并移除此双端队列的第一个元素,必要时将一直等待可用元素。 e takefirst() // 获取并移除此双端队列的最后一个元素,必要时将一直等待可用元素。 e takelast() // 返回以恰当顺序(从第一个元素到最后一个元素)包含此双端队列所有元素的数组。 object[] toarray() // 返回以恰当顺序包含此双端队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。 <t> t[] toarray(t[] a) // 返回此 collection 的字符串表示形式。 string tostring()
下面从arrayblockingqueue的创建,添加,取出,遍历这几个方面对linkedblockingdeque进行分析
1. 创建
下面以linkedblockingdeque(int capacity)来进行说明。
public linkedblockingdeque(int capacity) { if (capacity <= 0) throw new illegalargumentexception(); this.capacity = capacity; }
说明:capacity是“链式阻塞队列”的容量。
linkedblockingdeque中相关的数据结果定义如下:
// “双向队列”的表头 transient node<e> first; // “双向队列”的表尾 transient node<e> last; // 节点数量 private transient int count; // 容量 private final int capacity; // 互斥锁 , 互斥锁对应的“非空条件notempty”, 互斥锁对应的“未满条件notfull” final reentrantlock lock = new reentrantlock(); private final condition notempty = lock.newcondition(); private final condition notfull = lock.newcondition();
说明:lock是互斥锁,用于控制多线程对linkedblockingdeque中元素的互斥访问;而notempty和notfull是与lock绑定的条件,它们用于实现对多线程更精确的控制。
双向链表的节点node的定义如下:
static final class node<e> { e item; // 数据 node<e> prev; // 前一节点 node<e> next; // 后一节点 node(e x) { item = x; } }
2. 添加
下面以offer(e e)为例,对linkedblockingdeque的添加方法进行说明。
public boolean offer(e e) { return offerlast(e); }
offer()实际上是调用offerlast()将元素添加到队列的末尾。
offerlast()的源码如下:
public boolean offerlast(e e) { if (e == null) throw new nullpointerexception(); // 新建节点 node<e> node = new node<e>(e); final reentrantlock lock = this.lock; // 获取锁 lock.lock(); try { // 将“新节点”添加到双向链表的末尾 return linklast(node); } finally { // 释放锁 lock.unlock(); } }
说明:offerlast()的作用,是新建节点并将该节点插入到双向链表的末尾。它在插入节点前,会获取锁;操作完毕,再释放锁。
linklast()的源码如下:
private boolean linklast(node<e> node) { // 如果“双向链表的节点数量” > “容量”,则返回false,表示插入失败。 if (count >= capacity) return false; // 将“node添加到链表末尾”,并设置node为新的尾节点 node<e> l = last; node.prev = l; last = node; if (first == null) first = node; else l.next = node; // 将“节点数量”+1 ++count; // 插入节点之后,唤醒notempty上的等待线程。 notempty.signal(); return true; }
说明:linklast()的作用,是将节点插入到双向队列的末尾;插入节点之后,唤醒notempty上的等待线程。
3. 删除
下面以take()为例,对linkedblockingdeque的取出方法进行说明。
public e take() throws interruptedexception { return takefirst(); }
take()实际上是调用takefirst()队列的第一个元素。
takefirst()的源码如下:
public e takefirst() throws interruptedexception { final reentrantlock lock = this.lock; // 获取锁 lock.lock(); try { e x; // 若“队列为空”,则一直等待。否则,通过unlinkfirst()删除第一个节点。 while ( (x = unlinkfirst()) == null) notempty.await(); return x; } finally { // 释放锁 lock.unlock(); } }
说明:takefirst()的作用,是删除双向链表的第一个节点,并返回节点对应的值。它在插入节点前,会获取锁;操作完毕,再释放锁。
unlinkfirst()的源码如下:
private e unlinkfirst() { // assert lock.isheldbycurrentthread(); node<e> f = first; if (f == null) return null; // 删除并更新“第一个节点” node<e> n = f.next; e item = f.item; f.item = null; f.next = f; // help gc first = n; if (n == null) last = null; else n.prev = null; // 将“节点数量”-1 --count; // 删除节点之后,唤醒notfull上的等待线程。 notfull.signal(); return item; }
说明:unlinkfirst()的作用,是将双向队列的第一个节点删除;删除节点之后,唤醒notfull上的等待线程。
4. 遍历
下面对linkedblockingdeque的遍历方法进行说明。
public iterator<e> iterator() { return new itr(); }
iterator()实际上是返回一个iter对象。
itr类的定义如下:
private class itr extends abstractitr { // “双向队列”的表头 node<e> firstnode() { return first; } // 获取“节点n的下一个节点” node<e> nextnode(node<e> n) { return n.next; } }
itr继承于abstractitr,而abstractitr的定义如下:
private abstract class abstractitr implements iterator<e> { // next是下一次调用next()会返回的节点。 node<e> next; // nextitem是next()返回节点对应的数据。 e nextitem; // 上一次next()返回的节点。 private node<e> lastret; // 返回第一个节点 abstract node<e> firstnode(); // 返回下一个节点 abstract node<e> nextnode(node<e> n); abstractitr() { final reentrantlock lock = linkedblockingdeque.this.lock; // 获取“linkedblockingdeque的互斥锁” lock.lock(); try { // 获取“双向队列”的表头 next = firstnode(); // 获取表头对应的数据 nextitem = (next == null) ? null : next.item; } finally { // 释放“linkedblockingdeque的互斥锁” lock.unlock(); } } // 获取n的后继节点 private node<e> succ(node<e> n) { // chains of deleted nodes ending in null or self-links // are possible if multiple interior nodes are removed. for (;;) { node<e> s = nextnode(n); if (s == null) return null; else if (s.item != null) return s; else if (s == n) return firstnode(); else n = s; } } // 更新next和nextitem。 void advance() { final reentrantlock lock = linkedblockingdeque.this.lock; lock.lock(); try { // assert next != null; next = succ(next); nextitem = (next == null) ? null : next.item; } finally { lock.unlock(); } } // 返回“下一个节点是否为null” public boolean hasnext() { return next != null; } // 返回下一个节点 public e next() { if (next == null) throw new nosuchelementexception(); lastret = next; e x = nextitem; advance(); return x; } // 删除下一个节点 public void remove() { node<e> n = lastret; if (n == null) throw new illegalstateexception(); lastret = null; final reentrantlock lock = linkedblockingdeque.this.lock; lock.lock(); try { if (n.item != null) unlink(n); } finally { lock.unlock(); } } }
linkedblockingdeque示例
import java.util.*; import java.util.concurrent.*; /* * linkedblockingdeque是“线程安全”的队列,而linkedlist是非线程安全的。 * * 下面是“多个线程同时操作并且遍历queue”的示例 * (01) 当queue是linkedblockingdeque对象时,程序能正常运行。 * (02) 当queue是linkedlist对象时,程序会产生concurrentmodificationexception异常。 * * */ public class linkedblockingdequedemo1 { // todo: queue是linkedlist对象时,程序会出错。 //private static queue<string> queue = new linkedlist<string>(); private static queue<string> queue = new linkedblockingdeque<string>(); public static void main(string[] args) { // 同时启动两个线程对queue进行操作! new mythread("ta").start(); new mythread("tb").start(); } private static void printall() { string value; iterator iter = queue.iterator(); while(iter.hasnext()) { value = (string)iter.next(); system.out.print(value+", "); } system.out.println(); } private static class mythread extends thread { mythread(string name) { super(name); } @override public void run() { int i = 0; while (i++ < 6) { // “线程名” + "-" + "序号" string val = thread.currentthread().getname()+i; queue.add(val); // 通过“iterator”遍历queue。 printall(); } } } }
(某一次)运行结果:
ta1, ta1, tb1, tb1, ta1, ta1, tb1, tb1, tb2, tb2, ta2, ta2, ta1, ta1, tb1, tb1, tb2, tb2, ta2, ta2, tb3, tb3, ta3, ta3, ta1, tb1, ta1, tb2, tb1, ta2, tb2, tb3, ta2, ta3, tb3, tb4, ta3, ta4, tb4, ta1, ta4, tb1, tb5, tb2, ta1, ta2, tb1, tb3, tb2, ta3, ta2, tb4, tb3, ta4, ta3, tb5, tb4, ta5, ta4, ta1, tb5, tb1, ta5, tb2, tb6, ta2, ta1, tb3, tb1, ta3, tb2, tb4, ta2, ta4, tb3, tb5, ta3, ta5, tb4, tb6, ta4, ta6, tb5, ta5, tb6, ta6,
结果说明:示例程序中,启动两个线程(线程ta和线程tb)分别对linkedblockingdeque进行操作。以线程ta而言,它会先获取“线程名”+“序号”,然后将该字符串添加到linkedblockingdeque中;接着,遍历并输出linkedblockingdeque中的全部元素。 线程tb的操作和线程ta一样,只不过线程tb的名字和线程ta的名字不同。
当queue是linkedblockingdeque对象时,程序能正常运行。如果将queue改为linkedlist时,程序会产生concurrentmodificationexception异常。
推荐阅读
-
Java concurrency集合之LinkedBlockingDeque_动力节点Java学院整理
-
Java concurrency之Condition条件_动力节点Java学院整理
-
Java concurrency之CountDownLatch原理和示例_动力节点Java学院整理
-
Filter、Servlet、Listener的学习_动力节点Java学院整理
-
ehcache开源缓存框架_动力节点Java学院整理
-
servlet之cookie简介_动力节点Java学院整理
-
tomcat目录结构简介_动力节点Java学院整理
-
servlet之session工作原理简介_动力节点Java学院整理
-
Dom4j解析XML_动力节点Java学院整理
-
Java中Random简介_动力节点Java学院整理