阻塞队列的简单实现
程序员文章站
2024-01-13 19:35:04
...
阻塞队列的简单实现
阻塞队列是一种当队列满了和队列空了时,会阻塞的队列,简单的实现如下,实现了添加元素的操作(put)和取元素的操作(take),这里用到了JUC下的Condition和ReentrantLock类
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by CvShrimp on 2017/7/19.
*/
public class BlockingQueue {
final Lock lock = new ReentrantLock();
final Condition full = lock.newCondition();
final Condition empty = lock.newCondition();
//设置存放元素的Object数组,最多存放30个元素
final Object[] items = new Object[30];
//放置元素的位置
int putptr;
//取元素的位置
int takeptr;
//队列中元素的个数,当达到队列的最大长度(这里是30)或者0时,分别会阻塞put或者take操作,当满足put或者take的条件时,会自动唤醒相应的线程
int count;
public void put(Object o) {
lock.lock();
System.out.println("lock the put");
try {
if(count == items.length) {
System.out.println("Full, blocking put");
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
items[putptr] = o;
count++;
if(++putptr == items.length) {
putptr = 0;
}
System.out.println("put item success");
//唤醒由于队列空了,阻塞在empty上的线程
empty.signal();
} finally {
System.out.println("unlock the put");
lock.unlock();
}
}
public Object take() {
lock.lock();
System.out.println("lock the take");
try {
if(count == 0) {
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Object o = items[takeptr];
count--;
if(++takeptr == items.length) {
takeptr = 0;
}
//唤醒由于队列满了,阻塞在full上的线程
full.signal();
return o;
} finally {
System.out.println("unlock the take");
lock.unlock();
}
}
public boolean isEmpty() {
return count == 0;
}
public static void main(String[] args) {
BlockingQueue blockQueue = new BlockingQueue();
Runnable runnable = new Runnable() {
@Override
public void run() {
blockQueue.put("Lelouch");
blockQueue.put("Zero");
blockQueue.put("CvShrimp");
System.out.println(blockQueue.take());
System.out.println(blockQueue.take());
}
};
Runnable runnable1 = new Runnable() {
@Override
public void run() {
System.out.println(blockQueue.take());
}
};
ExecutorService threadPool = Executors.newCachedThreadPool();
threadPool.execute(runnable);
threadPool.execute(runnable1);
}
}
上一篇: Dubbo超时