java多线程消息队列的实现代码
程序员文章站
2023-11-22 16:28:28
本文介绍了java多线程消息队列的实现代码,分享给大家,希望对大家有帮助,顺便也自己留个笔记
1、定义一个队列缓存池:
//static修饰的成员变量和成...
本文介绍了java多线程消息队列的实现代码,分享给大家,希望对大家有帮助,顺便也自己留个笔记
1、定义一个队列缓存池:
//static修饰的成员变量和成员方法独立于该类的任何对象。也就是说,它不依赖类特定的实例,被类的所有实例共享。 private static list<queue> queuecache = new linkedlist<queue>();
2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行。
private integer offermaxqueue = 2000;
3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中
new thread(){ public void run(){ while(true){ string ip = null; try { synchronized (queuecache) { integer size = queuecache.size(); if(size==0){ //队列缓存池没有消息,等待。。。。 queuecache.wait(); } queue queue = queuecache.remove(0); if(isiplock(queuestr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理 queuecache.add(queue);该queue重新加入队列缓冲池,滞后处理, continue; }else{ ;//这里是处理该消息的操作。 } size = queuecache.size(); if(size<offermaxqueue&&size>=0){ queuecache.notifyall();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。 } } } catch (exception e) { e.printstacktrace(); }finally{ try {//检出该消息队列的锁 uniplock(queuestr); } catch (execption e) {//捕获异常,不能让线程挂掉 e.printstacktrace(); } } } }.start();
4、检入队列
synchronized (queuecache) { while(true){ integer size = queuecache.size(); if(size>=offermaxqueue){ try { queuecache.wait(); continue;//继续执行等待中的检入任务。 } catch (interruptedexception e) { e.printstacktrace(); } }//if if(size<=offermaxqueue&&size>0){ queuecache.notifyall(); } break;//检入完毕 }//while }
5、锁方法实现
/** * 锁 * @param ip * @return * @throws */ public boolean islock(string queuestr) { return this.redismanager.setnx(queuestr+"_lock", "lock", 10000)!=1; } //解锁 public void uniplock(string queuestr) { if(ip!=null){ this.redismanager.del(queuestr+"_lock"); // lock.unlock(); } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。