理解队列、消息队列--用redis实现消息队列
理解队列和消息队列
队列(来自百度百科):是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。
消息队列(来自百度百科):是在消息的传输过程中保存消息的容器。
从队列和消息队列的定义看来,看不出什么相似之处。但我理解它们的作用是相似的,只是使用环境不同。队列和消息队列 本质上都可以用于解决“生产者”和“消费者”问题,在二者这间建立桥梁,it中专业术语是对“生产者”和“消费者”进行解耦。可以动态的通过调整“生产者”和“消费者”线程数或服务器实例数,在正常情况使消费和生产到达一个平衡;在高峰情况下(生产者大于消费者)可以保护消费者不被拖垮的同时,还可以对把积压的数据保存下来,消费者可以延迟消费这些数据进行处理。
队列 一般指的是单个服务实例内部使用,比如,在java中的一个jvm实例内部可以使用Queue的子类(Deque:双端队列,是Queue的子接口),比如:单线程情况下使用LinkedList(*)、PriorityQueue(优先队列);多线程情况下可以阻塞队列ArrayBlockingQueue(有界)、LinkedBlockingQueue(*)、DelayQueue(延迟队列 *)、PriorityBlockingQueue(优先 *)、SynchronousQueue(没有容量的队列)。可以看到java的api已经很强大了,可以根据自己的业务需求选择使用。使用方法:生产者从一端放入消息,消费者从另一端取出消息进行处理,消息放到队列里(感觉是不是有点像“消息队列”的定义)。
另外上面提到的“有界”和“*”,指的是队列的容量大小。有界 指的是创建队列时必须指定队列的容量;* 创建队列时无需指定队列的容量,容量大小取决于jvm实例分配的内存空间大小。在海量业务场景里,我们期望队列的容量是无限的,但单个jvm实例 即便是使用“*”队列 由于单个实例内存是有限的,最终无法容纳下海量的消息数据。聪明的程序员就想 能不能使用一个第三方的队列来存储这些数据呢?当然是可以的,这就产生了“消息队列”。
消息队列 一般是采用一个独立的集群专门用于消息存储,可以存储在内存里 也可以直接存储在磁盘中。比如常见的:RabbitMQ、kafka、rocketMQ、ActiveMQ、zeromq等等,它们有不同的特性,以及采用了各种不同的实现,适用于各种场景的消息任务分发。但他们本质作用跟上面讲的单实例环境中java“队列”没什么两样:在消息的传输过程中保存消息的容器。只是这里转换到“分布式”环境中而已。
可以看到这里这里提到的“传统”消息队列,都是一个很重型的集群。如果这个分布式环境中的消息数量有限,我们可以不必引入这种重型的mq框架。比如:本次分享的主题 如何使用redis实现“消息队列”。
redis中的消息队列
redis中可以使用自带的publish和subscribe命令完成“消息推送”和“消息拉取”功能,实现消息队列。但这种方式有一个缺陷就是,消费者必须一致在线,否则会出现消费遗漏。
redis中的list(本质上是个双向链表)、zset(有序set)都可以用做“消息队列”的容器,稍加处理就可以实现一个高可用的“消息队列”。使用redis实现的“轻量化”“消息队列”有三大优势:
1、现在redis已经广泛运用于各大系统中,无需再次引入其他第三方框架和api。
2、并且redis是基于内存存储的,生产者和消费者的存取速度都非常快。
3、使用redis集群的的容量,可以通过添加实例进行扩展。
首先思考下做一个轻量化的“消息队列”,需要满足些什么基本要求:
1、消费顺序保持跟生产顺序一致。
2、对于广播消息,某个消费者实例重启后,能重新收到消息。
3、定时清理 所有消费者都已经消费过的数据,防止容量无限增长。
满足以上三点要求,就可以实现一个简单的“消息队列”了。
使用zset实现“消息队列”示例展示
这是一个真实的业务场景,为了实现java中的jar包热更新,在web页面中录入一个jar包的下载地址,某个server接受到请求后,首先把这个地址存入到mysql数据à然后向消息队列中发布一条消息à其他所有server都会监听这个“消息队列”(广播模式),接受队列中的消息à解析出消息队列中的jar包下载地址,下载jar包到自己服务器的某个目录下,通过自定义classLoader执行热更新操作。本示例中主要展示的是使用redis实现“消息队列”,jar包热更新部分会省略。整个实现过程都是采用了多个redis的zset实现,这里简单说下zset的数据结构:zset对应的key;zset中的每个成员;每个成员对应的分值(可以用于排序)。
消息队列zset:这个有点类似传统消息队列中的topic(主题),不同的消息类型可以放到不同的zset中。本示例中只有一种业务类型:jar包上传,只需定义一个zset即可:szet的key为upload_msg;每个成员为jar包链接;成员对应的分值为一个自增的id(可以通过redis的incr方法实现)。该zset结构示意图如下:
主题已消费记录zset:这个zset用于存储各个消费者(ip地址),已经消费的消息id。主要用途是用于清理“消息队列zset”中所有消费者都已经消费的消息。该zset数据结构:szet可key为 topic_upload;每个成员为消费者服务器ip;成员分值为消息id。该zset结构示意图如下:
消费者已消费记录zset:这“类”zset用于存储每个消费者(ip地址) 已消费的各个topic中的消息id。主要用途是,程序重启时,重新继续消费每个topic中剩余的消息。本示例中有两个消费者ip: 192.168.1.100、192.168.1.101,对应有两个szset,key分别为:server_192.168.1.100、server_192.168.1.100;成员为topic的key,这里只有1个topic对应的key为 upload_msg;成员分值为该ip已消费指定topic的消息id。这“类”zset的结构示意图如下:
各个数据结构设计完成后,下面开始来具体实现,这里采用的是java伪代码实现(没办法 就只擅长这个)。实现过程分三步:redis数据结构初始化、发送消息、接收消息,下面分别进行实现,首先定义redis中的key:
private static Jedis redis; public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key public static final String TOPIC_ZSET="topic_upload";//主题已消费记录zset key public static final String IP_ZSET_PRE="Server_";//消费者已消费记录zset key前缀 public static final String MSG_SEQ_ID="upload_seq_id";//消息队列 msg_id自增生成器 对应key
redis数据结构初始化:程序启动时(一般是生成者)首先检查 “题已消费记录zset”(或者消息队列zset)是否已经创建,如果没有则进行初始化,java伪代码如下:
//每个ip 启动时执行 public static void init(){ //获取本机ip String ipAddr = geIp(); if(ipAddr !=null){ Long ret = redis.zrank(TOPIC_ZSET, ipAddr); if(ret == null){//如果未创建,就开始初始化 String max_id = redis.get(MSG_SEQ_ID);//获取自增序列号 Long score = 0l; if(max_id != null){ score = Long.valueOf(max_id); } redis.zadd(TOPIC_ZSET,score,ipAddr);//初始化 主题已消费记录zset redis.zadd(IP_ZSET_PRE+ipAddr,score,TOPIC_ZSET);//初始化 消息队列zset(主题) } } }
发送消息:本过程比较简单,就是生产者服务器接受到jar包上传请求后,首先入库,然后向“消息队列”中发送一条消息,java伪代码实现如下:
public static void sendMsg(){ //省略入mysql库等业务方法 String upload_url = "xxxx"; Long now = System.currentTimeMillis(); //加时间搓,可以是实现重复上传同一个jar,也可以去掉 String msg = upload_url+"|"+now; //生成消息Id Long msg_id = redis.incr(MSG_SEQ_ID); System.out.println(msg_id); //发送消息 想消息队列中添加一条新消息 redis.zadd(MSG_ZSET,msg_id,msg); }
接收消息:该过程首先获取当前消费者ip已经执行消息id,到“消息队列”中获取该id之后的所有新消息进行业务处理;处理完成后更新“已消费记录”;最后清理所有消费者都消费过的消息。Java实现伪代码如下:
public static void receiveMsg(){ //获取本机ip String ipAddr = geIp(); if(ipAddr!=null){ while (true){ //获取当前已消费的msg_id Double score = redis.zscore(IP_ZSET_PRE+ipAddr,TOPIC_ZSET); System.out.println(score); //获取未读消息进行处理 Set<Tuple> tuples2 = redis.zrangeByScoreWithScores(MSG_ZSET, score.longValue() + 1 + "", "inf"); Double lastMsg_id = 0d; for (Tuple t : tuples2) {//模拟jar包下载,以及热更新 业务操作 lastMsg_id = t.getScore(); System.out.println(t.getElement() + ":" + t.getScore()); } if(tuples2.size()>0){ //处理完成后,更新该服务器的已处理列表 redis.zadd(TOPIC_ZSET,lastMsg_id,ipAddr); redis.zadd(IP_ZSET_PRE+ipAddr,lastMsg_id,TOPIC_ZSET); //找出所以ip都消费过的消息id,其实就是zset的第一个成员 Set<Tuple> first = redis.zrangeWithScores(TOPIC_ZSET,0,0); Double allReceive_id = 0d; if(first.iterator().hasNext()){ Tuple temp = first.iterator().next(); if(temp!=null){ allReceive_id = temp.getScore(); redis.zremrangeByScore(MSG_ZSET,0,allReceive_id); } } } try { Thread.sleep(1000);//每隔1秒钟消费一次 } catch (InterruptedException e) { e.printStackTrace(); } } } }
完整示例展示
完整代码实现如下,可以执行main方法进行测试:
public class RedisQueue { private static Jedis redis; public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key public static final String TOPIC_ZSET="topic_upload";//主题已消费记录zset key public static final String IP_ZSET_PRE="Server_";//消费者已消费记录zset key前缀 public static final String MSG_SEQ_ID="upload_seq_id";//消息队列 msg_id自增生成器 对应key public static void main(String[] args) { redis = new Jedis("192.168.26.128", 6379); init();//初始化,每个服务启动时 sendMsg();//模拟发送消息 receiveMsg();//消费消息 } //每个ip 启动时执行 public static void init(){ //获取本机ip String ipAddr = geIp(); if(ipAddr !=null){ Long ret = redis.zrank(TOPIC_ZSET, ipAddr); if(ret == null){//如果未创建,就开始初始化 String max_id = redis.get(MSG_SEQ_ID);//获取自增序列号 Long score = 0l; if(max_id != null){ score = Long.valueOf(max_id); } redis.zadd(TOPIC_ZSET,score,ipAddr);//初始化 主题已消费记录zset redis.zadd(IP_ZSET_PRE+ipAddr,score,TOPIC_ZSET);//初始化 消息队列zset(主题) } } } public static void sendMsg(){ //省略入mysql库等业务方法 String upload_url = "xxxx"; Long now = System.currentTimeMillis(); //加时间搓,可以是实现重复上传同一个jar,也可以去掉 String msg = upload_url+"|"+now; //生成消息Id Long msg_id = redis.incr(MSG_SEQ_ID); System.out.println(msg_id); //发送消息 想消息队列中添加一条新消息 redis.zadd(MSG_ZSET,msg_id,msg); } public static void receiveMsg(){ //获取本机ip String ipAddr = geIp(); if(ipAddr!=null){ while (true){ //获取当前已消费的msg_id Double score = redis.zscore(IP_ZSET_PRE+ipAddr,TOPIC_ZSET); System.out.println(score); //获取未读消息进行处理 Set<Tuple> tuples2 = redis.zrangeByScoreWithScores(MSG_ZSET, score.longValue() + 1 + "", "inf"); Double lastMsg_id = 0d; for (Tuple t : tuples2) {//模拟jar包下载,以及热更新 业务操作 lastMsg_id = t.getScore(); System.out.println(t.getElement() + ":" + t.getScore()); } if(tuples2.size()>0){ //处理完成后,更新该服务器的已处理列表 redis.zadd(TOPIC_ZSET,lastMsg_id,ipAddr); redis.zadd(IP_ZSET_PRE+ipAddr,lastMsg_id,TOPIC_ZSET); //找出所以ip都消费过的消息id,其实就是zset的第一个成员 Set<Tuple> first = redis.zrangeWithScores(TOPIC_ZSET,0,0); Double allReceive_id = 0d; if(first.iterator().hasNext()){ Tuple temp = first.iterator().next(); if(temp!=null){ allReceive_id = temp.getScore(); redis.zremrangeByScore(MSG_ZSET,0,allReceive_id); } } } try { Thread.sleep(1000);//每隔1秒钟消费一次 } catch (InterruptedException e) { e.printStackTrace(); } } } } private static String geIp(){ //获取本机ip String ipAddr = null; try { ipAddr = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); } return ipAddr; } }
一个轻量级的redis消息队列实现,上述代码基本都可以满足。如果生产者比较繁忙的话,又要保证消费顺序的前提下,在sendMsg()方法上需要使用redis分布式锁,来解决“线程安全”问题。关于redis实现的分布式锁 这里不细讲,后面有时间再单独总结(注意和前一章讲的jvm内部的类锁和对象锁区别开来,他们的关系有点类似于队列和消息队列的关系,也就是在单实例和分布式环境下的区别)。
出处:
http://moon-walker.iteye.com/blog/2401516