欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

记一次消息总线的打造 RabbitMQMQ

程序员文章站 2022-07-13 12:09:03
...

虽说消息队列的用法很简单:PUB/SUB, PRODUCER/CONSUMER,不过真做起来还真麻烦。

先说下原始需求:

  • Web前端发送命令消息,后端Consumer处理,然后前端得到结果
  • 需要支持Windows服务

很快,下图就出来了:

记一次消息总线的打造
            
    
    
        RabbitMQMQ

先来分析分析:

    • 前端怎么知道后端已经处理完成?
    • 前端如何在处理完后的第一时间被触发去执行某些callback呢?
    • Web前端很可能会通过ajax来定时查看某消息的处理状态

  第一反应是增加应答队列,此时:

    • 前端能够很及时的被通知到(后端处理完触发),来执行callback
    • 但是
      • ajax类型的定时查看怎么做?在ResponseQueue中查?显然不行(队列中数据越多,性能越差)
      • 如果前端关闭一段时间,消息会积压下来,性能越变越差

  因此决定增加一个DB来解决这些消息的保存以及后续的ajax类型的多次查询,如下图:

记一次消息总线的打造
            
    
    
        RabbitMQMQ

 再来分析分析,此时

    • 前端ajax类型的不定时、多次的查询某消息处理状态是解决了
    • 如果消息量很大,也可以将RabbitMQ以及DB分别做集群以及切片
    • 但,似乎还是得增加应答队列进去,因为现在CONSUMER处理完成后,针对前端的通知很麻烦,理由如下
      • 基于DB行记录的通知效率低
    • 但,即便增加了这个应答队列,也会出现如下问题
      • 如果前端崩掉后有段时间未on service,此时应答队列就会积压消息...性能会变差

  此时该咋办?

  答:用PUB/SUB机制来做这个应答队列,此时如果前端崩掉,就不会SUB了,只要online时才会有消息被通知到

  因此,继续出一张图

    记一次消息总线的打造
            
    
    
        RabbitMQMQ

  图中的Notifier, NotifierPublisher是前端和后端的BROKER,考虑到有些线程需要主动监听,因此画在了上面。

 

  再来谈谈后端,由于没有特别高要求,对后端的要求也就是这么几点:

    • 在业务逻辑角度,消息只能被无错处理一次
    • 如果出现了Exception, 则需要后续人工介入,消息不能丢失,但也不应该造成无限循环的报Exception
    • 对于报Exception的消息,人工处理要方便

  分别分析    

    • 在业务逻辑角度,消息只能被无错处理一次
      • 在业务处理没有报错的情况下,将RabbitMQ消息的Ack动作与DB的消息状态回写做成一个TRANSACTION,如下
      • 记一次消息总线的打造
            
    
    
        RabbitMQMQ
      • 这样就能保证此消息“从RABBITMQ Server中remove、写入查询DB”同时确保
      • 但是,如果存在下述情况时,会出现即便业务逻辑没有报错情况下多次执行
        • 那就是:如果业务逻辑执行完毕,没有报错,此时,即将触发上述代码,却还没有触发的时刻,服务crash了...
        • 解决方法
          • 在业务逻辑代码中加入幂等性
          • 在业务逻辑代码中加入检查性质代码
          • 告诉我下其他简便的方法吧(记录本地文件日志能解决,就是比较复杂)
    • 如果出现了Exception, 则需要后续人工介入,消息不能丢失,但也不应该造成无限循环的报Exception
      • 增加相应的Exception队列,实际中是增加了2个:如:
        • 比如目前有队列messages.CommandA,则异常队列有:
          • messages.CommandA.exceptions1
          • messages.CommandA.exceptions2
          • 为啥是2个?看后续
    • 对于报Exception的消息,人工处理要方便

      • 在配置文件中增加一个参数,表示运行级别:普通、异常1、异常2
      • 如果是普通级别,则CONSUMER会从messages.CommandA中获取消息进行处理,报错后会将消息move到exception1中
      • 如果是异常1级别,则CONSUMER会从messages.CommandA.exception1中获取消息进行处理,报错后会将消息move到exception2中
      • 如果是异常2级别,则CONSUMER会从messages.CommandA.exception2中获取消息进行处理,报错后将消息move到exception1中
      • 这里还有个问题,就是上面的这些RABBITMQ级别的消息从exception1移动到exception2中,都是分成PUBLISH和BASICACK两个CHANNEL上的动作完成的,不能套RABBITMQ的TX,也就是存在一致性问题
        • 解决起来同HandleSuccessfulMessage类似,都是通过本地db事务来做,都是借助了BASE思想来实现的

 

剩下的一个问题,RABBITMQ有优先级队列特性吗?答案是有:

记一次消息总线的打造
            
    
    
        RabbitMQMQ

 

注意:RABBITMQ默认是不支持客户端消息的优先级的,默认只支持Consumer的优先级,那如何实现客户端消息的优先级呢?

    1. 发送消息时,设置properties属性,记得设置Priority属性值
    2. 安装插件rabbitmq_priority_queue
    3. DeclareQueue时,加入x-max-priority特性值

 

DONE.

 

[转载自:http://www.cnblogs.com/aarond/p/MessageBus.html]

相关标签: RabbitMQ MQ