ArrayBlockingQueue 源码赏析
ArrayBlockingQueue为BlockingQueue的实现类,常用的阻塞队列,先来看一下次类的成员变量:
/** 存放数据的数组*/ final Object[] items; /**队列出口数据在数组中的下标*/ int takeIndex; /**队列入口在数组中的下标*/ int putIndex; /**存放数据数组的实际长度(被存放了数据的长度)*/ int count; /**并发用到的locak*/ final ReentrantLock lock; /**可以从队列里取出数据的条件变量*/ private final Condition notEmpty; /**向队列输入数据的条件变量*/ private final Condition notFull;
根据上面代码中的注释,大家应该知道成员变量的作用
下面我们来看几个常用的方法的源码:
1 构造方法:
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
由此构造方法来看,在创建ArrayBlockingQueue对象时,可以指定阻塞队列的长度,Lock的类型(公平锁或是非公平锁),以及向队列初始化一些数据,加入数据的时候要加锁!在finally语句块里释放锁
2 向队列添加元素 add offer put
public boolean add(E e) { return super.add(e); } public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } } public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } }
add 方法的具体实现在父类中:
public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); }
可以看到add的具体实现调用的是offer方法,如果队列已满,则会抛出IllegalStateException异常
在offer方法中,如果队列已满,则返回false(添加过程使用Lock,保证线程安全)
在put方法中,如果队列已满,条件变量notFull调用await方法,使其他向队列插入数据的线程阻塞
3 从队列取出元素 take poll peek
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } return extract(); } finally { lock.unlock(); } } public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : itemAt(takeIndex); } finally { lock.unlock(); } }
take 从阻塞队列里去取数据,整个过程加入中断锁,当队列中的数据为空时,notEmpty条件变量阻塞队列上的取数据的线程
poll 整个取数据的过程加锁, 支持设置等待时间
peek 此方法并不是取数据,而是读取数据,把队列出口处的数据返回,队列长度并没有减小,整个过程中加锁!
把ArrayBlockingQueue源码拿出来分析一下的原因是:阻塞队列实现线程阻塞和安全是由Condition和Lock一起来实现的,所以要深入掌握Lock和Condition的使用!
下一篇: Semaphore 使用案例
推荐阅读
-
Spring源码分析——调试环境搭建(可能是最省事的构建方法)
-
jQuery实现的动态文字变化输出效果示例【附演示与demo源码下载】
-
jQuery插件FusionCharts实现的MSBar2D图效果示例【附demo源码】
-
jQuery插件FusionCharts实现的Marimekko图效果示例【附demo源码】
-
浅谈vux之x-input使用以及源码解读
-
scrapy-redis源码分析之发送POST请求详解
-
jQuery插件HighCharts绘制的基本折线图效果示例【附demo源码下载】
-
centos源码编译php5 mcrypt模块步骤详解
-
【spring-boot 源码解析】spring-boot 依赖管理梳理图
-
Springboot源码分析之Spring循环依赖揭秘