ReentrantLock,Condition
程序员文章站
2022-04-19 08:22:58
...
public class ReentrantLockAndConditionTest { public static void main(String[] args) { ReentrantLockQueue queue =new ReentrantLockQueue(); for (int i = 0; i < 100; i++) { queue.put("a"); String string = queue.getString(); System.out.println(string); } } public abstract class MessageQueue<T>{ private Queue<T> queue; private List<FailedMessageWrap> resendList; protected int resendSleepInterval = 1000 * 60 ; protected int maxFailedCount = 10; private Lock sendLock = new ReentrantLock(); private Condition sendCondition = sendLock.newCondition(); private Lock resendLock = new ReentrantLock(); private volatile boolean stopRequired ; public MessageQueue(){ queue = new LinkedList<T>(); resendList = new LinkedList<FailedMessageWrap>(); stopRequired = false; ExecutorService sendService = Executors.newFixedThreadPool(1); for (int i = 0; i < 1; i++) { sendService.execute(new SendTask()); } Executors.newSingleThreadExecutor().execute(new ResendTask()); } public void send(T message){ if(message == null){ return; } try { sendLock.lock(); queue.add(message); sendCondition.signalAll(); }finally{ sendLock.unlock(); } } public void stop(){ stopRequired = true; } protected abstract boolean doSend(T message); class FailedMessageWrap{ private T message; private int failedCount; FailedMessageWrap(T message){ this.message = message; failedCount = 0; } public int getFailedCount() { return failedCount; } public void increaseFailedCount() { this.failedCount += 1; } public T getMessage() { return message; } } class SendTask implements Runnable{ @Override public void run() { while(!stopRequired){ T message; try { sendLock.lock(); message = queue.poll(); if(message == null){ try { sendCondition.await(); } catch (Exception e) { e.printStackTrace(); } continue; } }finally{ sendLock.unlock(); } if(!doSend(message)){ try { resendLock.lock(); resendList.add(new FailedMessageWrap(message)); } finally{ resendLock.unlock(); } } } } } class ResendTask implements Runnable{ @Override public void run() { while(!stopRequired){ try { Thread.sleep(resendSleepInterval); } catch (InterruptedException e) { e.printStackTrace(); } List<FailedMessageWrap> removeList = new ArrayList<FailedMessageWrap>(); try { resendLock.lock(); for(FailedMessageWrap messageWrap : resendList){ if(messageWrap.getFailedCount() > maxFailedCount){ removeList.add(messageWrap); continue; } T message = messageWrap.getMessage(); if(!doSend(message)){ messageWrap.increaseFailedCount(); }else{ removeList.add(messageWrap); } } for (FailedMessageWrap messageWrap : removeList) { resendList.remove(messageWrap); } } finally{ resendLock.unlock(); } } } } } public static class ReentrantLockQueue{ private ReentrantLock lock = new ReentrantLock(); private Queue<String> queue = new LinkedList<String>(); public void put(String s){ try{ lock.lock(); queue.add(s); }catch(Exception e){ }finally{ lock.unlock(); } } public String getString(){ try{ lock.lock(); String poll = queue.poll(); return poll; }catch(Exception e){ }finally{ lock.unlock(); } return null; } } }
捐助开发者
在兴趣的驱动下,写一个免费
的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(支持支付宝和微信 以及扣扣群),没钱捧个人场,谢谢各位。
个人主页:http://knight-black-bob.iteye.com/
谢谢您的赞助,我会做的更好!
推荐阅读
-
java ReentrantLock详解
-
Java多线程中ReentrantLock与Condition详解
-
Java concurrency之Condition条件_动力节点Java学院整理
-
深入理解java内置锁(synchronized)和显式锁(ReentrantLock)
-
AbstractQueuedSynchronizer源码分析(ReentrantLock锁的实现)
-
Python线程条件变量Condition原理解析
-
Python多线程编程(七):使用Condition实现复杂同步
-
python使用threading.Condition交替打印两个字符
-
Java并发之条件阻塞Condition的应用代码示例
-
Java并发——结合CountDownLatch源码、Semaphore源码及ReentrantLock源码来看AQS原理