读源码之:ArrayBlockingQueue 博客分类: Java java
程序员文章站
2024-02-04 15:38:58
...
ArrayBlockingQueue是concurrent包提供的一个线程安全的队列,由一个数组来保存队列元素.通过takeIndex和putIndex来分别记录出队列和入队列的下标,以保证在出队列时不进行元素移动.
最后需要注意的就是带超时唤醒的是offer和poll而不是put和take
//在出队列或者入队列的时候对takeIndex或者putIndex进行累加,如果已经到了数组末尾就又从0开始,保证数组的循环使用. final int inc(int i) { return (++i == items.length) ? 0 : i; } //入队列操作 private void insert(E x) { items[putIndex] = x; putIndex = inc(putIndex); ++count; notEmpty.signal();//唤醒等待的出队线程 } //使用offer入队列,如果队列已满就立即返回false public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } } //使用put入队列的话,如果队列已满当前线程就等待然后释放锁,直到被notFull唤醒,再重新检查,直到成功插入队列 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } //出队列操作 private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]); items[takeIndex] = null; takeIndex = inc(takeIndex); --count; notFull.signal();//唤醒等待入队的线程 return x; } //poll出队是不需要等待的,如果当前队列是空就直接返回null public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : extract(); } finally { lock.unlock(); } } //take就跟put一样,如果队列是空的就等待直到被notEmpty唤醒 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } }
最后需要注意的就是带超时唤醒的是offer和poll而不是put和take