手写线程池
程序员文章站
2022-05-03 23:47:09
文章目录阻塞队列coreSize用完了,队列也满了采用了 才用这个接口的实现类的方法线程池测试本文代码可能会因为某些地方没有加锁,会抛出异常!请谨慎食用。写这个只是为了更好地了解线程池的实现。阻塞队列public class BlockQueue { private Deque queue=new ArrayDeque<>(); private int capcity; private ReentrantLock lock=new...
本文代码可能会因为某些地方没有加锁,会抛出异常!请谨慎食用。写这个只是为了更好地了解线程池的实现。
阻塞队列
public class BlockQueue<T> {
private Deque<T> queue=new ArrayDeque<>();
private int capcity;
private ReentrantLock lock=new ReentrantLock();
private Condition emptyWait=lock.newCondition();
private Condition fullWait=lock.newCondition();
public BlockQueue(int capcity) {
this.capcity = capcity;
}
public int getSize(){
return queue.size();
}
public T take(){
lock.lock();
try{
while(queue.isEmpty()){
try {
emptyWait.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t=queue.removeFirst();
fullWait.signal();
return t;
}finally{
lock.unlock();
}
}
public void put(T task,RejectPolicy<T> rejectPolicy){
lock.lock();
try{
if(queue.size()==capcity){
rejectPolicy.reject(this,task);
}else{
queue.addLast(task);
}
}finally {
lock.unlock();
}
}
public T timeTake(long timeout, TimeUnit timeUnit) {
lock.lock();
try{
long nanos=timeUnit.toNanos(timeout);
while(queue.isEmpty()){
try {
if(nanos<=0) return null;
nanos=emptyWait.awaitNanos(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t=queue.removeFirst();
fullWait.signal();
return t;
}finally{
lock.unlock();
}
}
public boolean timePut(T task,long timeout,TimeUnit timeUnit){
lock.lock();
try{
long nanos=timeUnit.toNanos(timeout);
while(queue.size()==capcity){
try {
if(nanos<=0) return false;
nanos=fullWait.awaitNanos(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(task);
emptyWait.signal();
return true;
}finally{
lock.unlock();
}
}
}
coreSize用完了,队列也满了采用了 才用这个接口的实现类的方法
public interface RejectPolicy<T> {
void reject(BlockQueue<T> taskQueue,T task);
}
线程池
public class ThreadPool {
private BlockQueue<Runnable> taskQueue;
private int coreSize;
private HashSet<Worker> workers=new HashSet<>();
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int capcity,int coreSize,RejectPolicy<Runnable> rejectPolicy){
this.taskQueue=new BlockQueue<>(capcity);
this.coreSize=coreSize;
this.rejectPolicy=rejectPolicy;
}
public void excute(Runnable task){
synchronized (workers){
if(workers.size()<coreSize){
Worker worker=new Worker(task);
worker.start();
workers.add(worker);
}else{
taskQueue.put(task,rejectPolicy);
}
}
}
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task=task;
}
@Override
public void run(){
while(task!=null||(task=taskQueue.timeTake(1000,TimeUnit.MILLISECONDS))!=null){
try{
task.run();
}catch(RuntimeException e){
e.printStackTrace();
}
task=null;
}
workers.remove(this);
}
}
}
测试
public class Test {
public static void main(String[] args) {
ThreadPool threadPool=new ThreadPool(2,2,(queue,task)->{
//什么也不做
//task.run(); 让主线程去执行
//throw new RuntimeException("阻塞队列容量不够 直接不管执行任务失败"+task);//抛出异常
//boolean flag=queue.timePut(task,1000, TimeUnit.MILLISECONDS);
//if(flag==false) throw new RuntimeException("阻塞队列容量不够,超时获取也没获取得到"+task);
});
threadPool.excute(()->{
System.out.println("fnq是小狗");
});
threadPool.excute(()->{
System.out.println("swt是小猪猪");
});
}
}
本文地址:https://blog.csdn.net/qq_42576687/article/details/109268564