【并发】8、借助redis 实现多线程生产消费阻塞队列
程序员文章站
2022-06-28 21:57:53
顾名思义这个就是再消费的时候,不是之前的那哥用yield进行线程切换的操作,而是用线程等待阻塞的方式去执行,说实话我感觉效率不一定有之前那个好, 因为我对这种阻塞队列使用的时候,之前有发现阻塞队列,塞着塞着线程就会进入假死状态,这个很奇怪,但是有的时候又是好的,这个也不清楚到底是为什么 但是毕竟也是 ......
顾名思义这个就是再消费的时候,不是之前的那哥用yield进行线程切换的操作,而是用线程等待阻塞的方式去执行,说实话我感觉效率不一定有之前那个好,
因为我对这种阻塞队列使用的时候,之前有发现阻塞队列,塞着塞着线程就会进入假死状态,这个很奇怪,但是有的时候又是好的,这个也不清楚到底是为什么
但是毕竟也是一种实现,我就写出来了看看吧
生产者
package queue.redisqueue; import queue.fqueue.vo.tempvo; import redis.clients.jedis.jedis; import java.io.bytearrayoutputstream; import java.io.objectoutputstream; import java.util.uuid; /** * @projectname: cutter-point * @package: queue.redisqueue * @classname: redisqueueproducter2 * @author: xiaof * @description: ${description} * @date: 2019/6/12 16:29 * @version: 1.0 */ public class redisqueueproducter2 implements runnable { private jedis jedis; private string queuekey; public redisqueueproducter2(jedis jedis, string queuekey) { this.jedis = jedis; this.queuekey = queuekey; } @override public void run() { while(true) { try { thread.sleep((long) (math.random() * 1000)); //不存在则创建,存在则直接插入 //向redis队列中存放数据 //生成数据 tempvo tempvo = new tempvo(); tempvo.setname(thread.currentthread().getname() + ",time is:" + uuid.randomuuid()); //序列化为字节 bytearrayoutputstream arrayoutputstream = new bytearrayoutputstream(); objectoutputstream objectoutputstream = new objectoutputstream(arrayoutputstream); objectoutputstream.writeobject(tempvo); arrayoutputstream.flush(); try { int i = 0; while(i < 10) { long num = jedis.lpush(queuekey.getbytes(), arrayoutputstream.tobytearray()); if(num > 0) { system.out.println("成功!"); break; } ++i; } } catch (exception e) { system.out.println("失败!"); // long num = jedis.lpush(queuekey.getbytes(), arrayoutputstream.tobytearray()); } } catch (exception e) { e.printstacktrace(); } } } }
消费者
package queue.redisqueue; import queue.fqueue.vo.eventvo; import redis.clients.jedis.jedis; import java.io.bytearrayinputstream; import java.io.ioexception; import java.io.objectinputstream; import java.util.list; /** * @projectname: cutter-point * @package: queue.redisqueue * @classname: redisqueueconsume2 * @author: xiaof * @description: ${description} * @date: 2019/6/12 16:40 * @version: 1.0 */ public class redisqueueconsume2 implements runnable { private jedis jedis; private string queuekey; public redisqueueconsume2(jedis jedis, string queuekey) { this.jedis = jedis; this.queuekey = queuekey; } @override public void run() { while(true) { list<byte[]> byteslist = null; try{ //这种就是阻塞队列模式 byteslist = jedis.blpop(0, queuekey.getbytes()); } catch (exception e) { } //反序列化对象 if(byteslist == null || byteslist.size() <= 0) { thread.yield(); continue; } //获取第二个对象,就是我们的字节数组 system.out.println(new string(byteslist.get(0))); bytearrayinputstream bytearrayinputstream = new bytearrayinputstream(byteslist.get(1)); try { objectinputstream objectinputstream = new objectinputstream(bytearrayinputstream); eventvo eventvo = (eventvo) objectinputstream.readobject(); eventvo.dooperater(); } catch (ioexception e) { e.printstacktrace(); } catch (classnotfoundexception e) { e.printstacktrace(); } } } }
测试代码
消费队列
接下来我们把生产线程停掉
此时队列还有
我们把它消费完
当只剩最后一个的时候
可以进入下一步,好当队列为空的时候,我们再尝试去取数据的时候
队列会阻塞再这个地方,相当于是挂起线程