欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

理解队列、消息队列--用redis实现消息队列

程序员文章站 2022-03-07 19:17:31
...

理解队列和消息队列

 

队列(来自百度百科):是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。

 

消息队列(来自百度百科):是在消息的传输过程中保存消息的容器。

 

从队列和消息队列的定义看来,看不出什么相似之处。但我理解它们的作用是相似的,只是使用环境不同。队列和消息队列 本质上都可以用于解决生产者和“消费者”问题,在二者这间建立桥梁,it中专业术语是对生产者和“消费者”进行解耦。可以动态的通过调整生产者和“消费者”线程数或服务器实例数,在正常情况使消费和生产到达一个平衡;在高峰情况下(生产者大于消费者)可以保护消费者不被拖垮的同时,还可以对把积压的数据保存下来,消费者可以延迟消费这些数据进行处理。

 

队列 一般指的是单个服务实例内部使用,比如,在java中的一个jvm实例内部可以使用Queue的子类(Deque:双端队列,是Queue的子接口),比如:单线程情况下使用LinkedList(*)PriorityQueue(优先队列);多线程情况下可以阻塞队列ArrayBlockingQueue(有界)LinkedBlockingQueue(*)DelayQueue(延迟队列 *)PriorityBlockingQueue(优先 *)SynchronousQueue(没有容量的队列)。可以看到javaapi已经很强大了,可以根据自己的业务需求选择使用。使用方法:生产者从一端放入消息,消费者从另一端取出消息进行处理,消息放到队列里(感觉是不是有点像“消息队列”的定义)。


理解队列、消息队列--用redis实现消息队列
            
    
    博客分类: redis 消息队列队列redis实现消息队列 
 

 

另外上面提到的有界*,指的是队列的容量大小。有界 指的是创建队列时必须指定队列的容量;* 创建队列时无需指定队列的容量,容量大小取决于jvm实例分配的内存空间大小。在海量业务场景里,我们期望队列的容量是无限的,但单个jvm实例 即便是使用*队列 由于单个实例内存是有限的,最终无法容纳下海量的消息数据。聪明的程序员就想 能不能使用一个第三方的队列来存储这些数据呢?当然是可以的,这就产生了消息队列

 

消息队列 一般是采用一个独立的集群专门用于消息存储,可以存储在内存里 也可以直接存储在磁盘中。比如常见的:RabbitMQkafkarocketMQActiveMQzeromq等等,它们有不同的特性,以及采用了各种不同的实现,适用于各种场景的消息任务分发。但他们本质作用跟上面讲的单实例环境中java“队列没什么两样:在消息的传输过程中保存消息的容器。只是这里转换到分布式环境中而已。


理解队列、消息队列--用redis实现消息队列
            
    
    博客分类: redis 消息队列队列redis实现消息队列 
 

 

可以看到这里这里提到的传统消息队列,都是一个很重型的集群。如果这个分布式环境中的消息数量有限,我们可以不必引入这种重型的mq框架。比如:本次分享的主题 如何使用redis实现“消息队列”。

 

redis中的消息队列

 

redis中可以使用自带的publishsubscribe命令完成消息推送消息拉取功能,实现消息队列。但这种方式有一个缺陷就是,消费者必须一致在线,否则会出现消费遗漏。

 

redis中的list(本质上是个双向链表)、zset(有序set)都可以用做消息队列的容器,稍加处理就可以实现一个高可用的消息队列。使用redis实现的“轻量化”消息队列有三大优势:

1、现在redis已经广泛运用于各大系统中,无需再次引入其他第三方框架和api

2、并且redis是基于内存存储的,生产者和消费者的存取速度都非常快。

3、使用redis集群的的容量,可以通过添加实例进行扩展。

 

首先思考下做一个轻量化的消息队列,需要满足些什么基本要求:

1、消费顺序保持跟生产顺序一致。

2、对于广播消息,某个消费者实例重启后,能重新收到消息。

3、定时清理 所有消费者都已经消费过的数据,防止容量无限增长。

 

满足以上三点要求,就可以实现一个简单的消息队列了。

 

使用zset实现“消息队列”示例展示

 

这是一个真实的业务场景,为了实现java中的jar包热更新,在web页面中录入一个jar包的下载地址,某个server接受到请求后,首先把这个地址存入到mysql数据à然后向消息队列中发布一条消息à其他所有server都会监听这个消息队列(广播模式),接受队列中的消息à解析出消息队列中的jar包下载地址,下载jar包到自己服务器的某个目录下,通过自定义classLoader执行热更新操作。本示例中主要展示的是使用redis实现消息队列jar包热更新部分会省略。整个实现过程都是采用了多个rediszset实现,这里简单说下zset的数据结构:zset对应的keyzset中的每个成员;每个成员对应的分值(可以用于排序)。

 

消息队列zset:这个有点类似传统消息队列中的topic(主题),不同的消息类型可以放到不同的zset中。本示例中只有一种业务类型:jar包上传,只需定义一个zset即可:szetkeyupload_msg;每个成员为jar包链接;成员对应的分值为一个自增的id(可以通过redisincr方法实现)。该zset结构示意图如下:


理解队列、消息队列--用redis实现消息队列
            
    
    博客分类: redis 消息队列队列redis实现消息队列 
 

 

主题已消费记录zset:这个zset用于存储各个消费者(ip地址),已经消费的消息id。主要用途是用于清理“消息队列zset”中所有消费者都已经消费的消息。该zset数据结构:szetkey topic_upload;每个成员为消费者服务器ip;成员分值为消息id。该zset结构示意图如下:


理解队列、消息队列--用redis实现消息队列
            
    
    博客分类: redis 消息队列队列redis实现消息队列 
 

 

消费者已消费记录zset:这“类”zset用于存储每个消费者(ip地址) 已消费的各个topic中的消息id。主要用途是,程序重启时,重新继续消费每个topic中剩余的消息。本示例中有两个消费者ip: 192.168.1.100192.168.1.101,对应有两个szset,key分别为:server_192.168.1.100server_192.168.1.100;成员为topickey,这里只有1topic对应的key upload_msg;成员分值为该ip已消费指定topic的消息id。这“类”zset的结构示意图如下:


理解队列、消息队列--用redis实现消息队列
            
    
    博客分类: redis 消息队列队列redis实现消息队列 

理解队列、消息队列--用redis实现消息队列
            
    
    博客分类: redis 消息队列队列redis实现消息队列 

各个数据结构设计完成后,下面开始来具体实现,这里采用的是java伪代码实现(没办法 就只擅长这个)。实现过程分三步:redis数据结构初始化、发送消息、接收消息,下面分别进行实现,首先定义redis中的key

    private static Jedis redis;
 
    public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key
    public static final String TOPIC_ZSET="topic_upload";//主题已消费记录zset key
    public static final String IP_ZSET_PRE="Server_";//消费者已消费记录zset key前缀
    public static final String MSG_SEQ_ID="upload_seq_id";//消息队列 msg_id自增生成器 对应key

redis数据结构初始化:程序启动时(一般是生成者)首先检查 “题已消费记录zset(或者消息队列zset)是否已经创建,如果没有则进行初始化,java伪代码如下:

//每个ip 启动时执行
    public static void init(){
        //获取本机ip
        String ipAddr = geIp();
        if(ipAddr !=null){
            Long ret = redis.zrank(TOPIC_ZSET, ipAddr);
            if(ret == null){//如果未创建,就开始初始化
                String max_id = redis.get(MSG_SEQ_ID);//获取自增序列号
                Long score = 0l;
                if(max_id != null){
                    score = Long.valueOf(max_id);
                }
                redis.zadd(TOPIC_ZSET,score,ipAddr);//初始化 主题已消费记录zset
                redis.zadd(IP_ZSET_PRE+ipAddr,score,TOPIC_ZSET);//初始化 消息队列zset(主题)
            }
        }
}
 

 

发送消息:本过程比较简单,就是生产者服务器接受到jar包上传请求后,首先入库,然后向消息队列中发送一条消息,java伪代码实现如下:

 

    public static void sendMsg(){
        //省略入mysql库等业务方法
        String upload_url = "xxxx";
        Long now = System.currentTimeMillis();
        //加时间搓,可以是实现重复上传同一个jar,也可以去掉
        String msg = upload_url+"|"+now;
 
        //生成消息Id
        Long msg_id = redis.incr(MSG_SEQ_ID);
        System.out.println(msg_id);
 
        //发送消息 想消息队列中添加一条新消息
        redis.zadd(MSG_ZSET,msg_id,msg);
}
 

 

接收消息:该过程首先获取当前消费者ip已经执行消息id,到消息队列中获取该id之后的所有新消息进行业务处理;处理完成后更新“已消费记录”;最后清理所有消费者都消费过的消息。Java实现伪代码如下:

 
public static void receiveMsg(){
        //获取本机ip
        String ipAddr = geIp();
        if(ipAddr!=null){
            while (true){
                //获取当前已消费的msg_id
                Double score = redis.zscore(IP_ZSET_PRE+ipAddr,TOPIC_ZSET);
                System.out.println(score);
 
                //获取未读消息进行处理
                Set<Tuple> tuples2 = redis.zrangeByScoreWithScores(MSG_ZSET, score.longValue() + 1 + "", "inf");
                Double lastMsg_id = 0d;
                for (Tuple t : tuples2) {//模拟jar包下载,以及热更新 业务操作
                    lastMsg_id = t.getScore();
                    System.out.println(t.getElement() + ":" + t.getScore());
                }
 
                if(tuples2.size()>0){
                    //处理完成后,更新该服务器的已处理列表
                    redis.zadd(TOPIC_ZSET,lastMsg_id,ipAddr);
                    redis.zadd(IP_ZSET_PRE+ipAddr,lastMsg_id,TOPIC_ZSET);
 
                    //找出所以ip都消费过的消息id,其实就是zset的第一个成员
                    Set<Tuple> first = redis.zrangeWithScores(TOPIC_ZSET,0,0);
                    Double allReceive_id = 0d;
                    if(first.iterator().hasNext()){
                        Tuple temp = first.iterator().next();
                        if(temp!=null){
                            allReceive_id = temp.getScore();
                            redis.zremrangeByScore(MSG_ZSET,0,allReceive_id);
                        }
                    }
                }
 
 
 
                try {
                    Thread.sleep(1000);//每隔1秒钟消费一次
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
 
}

完整示例展示

 

完整代码实现如下,可以执行main方法进行测试:

public class RedisQueue {
    private static Jedis redis;
 
    public static final String MSG_ZSET="upload_msg";//消息队列zset(主题) key
    public static final String TOPIC_ZSET="topic_upload";//主题已消费记录zset key
    public static final String IP_ZSET_PRE="Server_";//消费者已消费记录zset key前缀
    public static final String MSG_SEQ_ID="upload_seq_id";//消息队列 msg_id自增生成器 对应key
 
    public static void main(String[] args) {
        redis = new Jedis("192.168.26.128", 6379);
        init();//初始化,每个服务启动时
        sendMsg();//模拟发送消息
        receiveMsg();//消费消息
 
    }
 
    //每个ip 启动时执行
    public static void init(){
        //获取本机ip
        String ipAddr = geIp();
        if(ipAddr !=null){
            Long ret = redis.zrank(TOPIC_ZSET, ipAddr);
            if(ret == null){//如果未创建,就开始初始化
                String max_id = redis.get(MSG_SEQ_ID);//获取自增序列号
                Long score = 0l;
                if(max_id != null){
                    score = Long.valueOf(max_id);
                }
                redis.zadd(TOPIC_ZSET,score,ipAddr);//初始化 主题已消费记录zset
                redis.zadd(IP_ZSET_PRE+ipAddr,score,TOPIC_ZSET);//初始化 消息队列zset(主题)
            }
        }
    }
 
    public static void sendMsg(){
        //省略入mysql库等业务方法
        String upload_url = "xxxx";
        Long now = System.currentTimeMillis();
        //加时间搓,可以是实现重复上传同一个jar,也可以去掉
        String msg = upload_url+"|"+now;
 
        //生成消息Id
        Long msg_id = redis.incr(MSG_SEQ_ID);
        System.out.println(msg_id);
 
        //发送消息 想消息队列中添加一条新消息
        redis.zadd(MSG_ZSET,msg_id,msg);
    }
 
    public static void receiveMsg(){
        //获取本机ip
        String ipAddr = geIp();
        if(ipAddr!=null){
            while (true){
                //获取当前已消费的msg_id
                Double score = redis.zscore(IP_ZSET_PRE+ipAddr,TOPIC_ZSET);
                System.out.println(score);
 
                //获取未读消息进行处理
                Set<Tuple> tuples2 = redis.zrangeByScoreWithScores(MSG_ZSET, score.longValue() + 1 + "", "inf");
                Double lastMsg_id = 0d;
                for (Tuple t : tuples2) {//模拟jar包下载,以及热更新 业务操作
                    lastMsg_id = t.getScore();
                    System.out.println(t.getElement() + ":" + t.getScore());
                }
 
                if(tuples2.size()>0){
                    //处理完成后,更新该服务器的已处理列表
                    redis.zadd(TOPIC_ZSET,lastMsg_id,ipAddr);
                    redis.zadd(IP_ZSET_PRE+ipAddr,lastMsg_id,TOPIC_ZSET);
 
                    //找出所以ip都消费过的消息id,其实就是zset的第一个成员
                    Set<Tuple> first = redis.zrangeWithScores(TOPIC_ZSET,0,0);
                    Double allReceive_id = 0d;
                    if(first.iterator().hasNext()){
                        Tuple temp = first.iterator().next();
                        if(temp!=null){
                            allReceive_id = temp.getScore();
                            redis.zremrangeByScore(MSG_ZSET,0,allReceive_id);
                        }
                    }
                }
 
                try {
                    Thread.sleep(1000);//每隔1秒钟消费一次
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
 
    }
 
    private static String geIp(){
        //获取本机ip
        String ipAddr = null;
        try {
            ipAddr = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return ipAddr;
    }
}
 

 

 

一个轻量级的redis消息队列实现,上述代码基本都可以满足。如果生产者比较繁忙的话,又要保证消费顺序的前提下,在sendMsg()方法上需要使用redis分布式锁,来解决线程安全问题。关于redis实现的分布式锁 这里不细讲,后面有时间再单独总结(注意和前一章讲的jvm内部的类锁和对象锁区别开来,他们的关系有点类似于队列和消息队列的关系,也就是在单实例和分布式环境下的区别)

 

出处:

http://moon-walker.iteye.com/blog/2401516

  • 理解队列、消息队列--用redis实现消息队列
            
    
    博客分类: redis 消息队列队列redis实现消息队列 
  • 大小: 15.7 KB
  • 理解队列、消息队列--用redis实现消息队列
            
    
    博客分类: redis 消息队列队列redis实现消息队列 
  • 大小: 30.7 KB
  • 理解队列、消息队列--用redis实现消息队列
            
    
    博客分类: redis 消息队列队列redis实现消息队列 
  • 大小: 13.6 KB
  • 理解队列、消息队列--用redis实现消息队列
            
    
    博客分类: redis 消息队列队列redis实现消息队列 
  • 大小: 13.7 KB
  • 理解队列、消息队列--用redis实现消息队列
            
    
    博客分类: redis 消息队列队列redis实现消息队列 
  • 大小: 11.7 KB
  • 理解队列、消息队列--用redis实现消息队列
            
    
    博客分类: redis 消息队列队列redis实现消息队列 
  • 大小: 11.5 KB