欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

消息队列全面了解

程序员文章站 2022-03-03 10:37:23
消息队列都应用到了哪些实际的应用场景中? 一、再谈消息队列的应用场景 1、异步处理:例如短信通知、终端状态推送、App推送、用户注册等 2、数据同部:业务数据推送同步 3、重试补偿:记账失败重试 4、系统解耦:通讯上下行、终端异常监控、分布式事件中心 5、流量削峰:秒杀场景下的下单处理 6、发布订阅 ......

消息队列都应用到了哪些实际的应用场景中?

一、再谈消息队列的应用场景

1、异步处理:例如短信通知、终端状态推送、app推送、用户注册等

2、数据同部:业务数据推送同步

3、重试补偿:记账失败重试

4、系统解耦:通讯上下行、终端异常监控、分布式事件中心

5、流量削峰:秒杀场景下的下单处理

6、发布订阅:hsf的服务状态变化通知、分布式事件中心

7、高并发缓冲:日志服务、监控上报

但是,我们对消息队列的底层技术和原理还是不了解,那么我们马上开始吧。

二、消息队列的一些基本概念和简单原理

1、broker

broker的概念来自于apache activemq,通俗的讲就是mq的服务器。

2、消息的生产者、消费者

消息生产者producer:发送消息到消息队列。

消息消费者consumer:从消息队列接收消息。

消息队列全面了解

 

 3、点对点消息队列模型

消息生产者向一个特定的队列发送消息,消息消费者从该队列中接收消息;消息的生产者和消费者可以不同时处于运行状态。每个成功处理的消息都由消息消费者签收确认(acknowledge)。如图:

消息队列全面了解

 

 4、发布订阅消息模型-topic

发布订阅消息模型中,支持向一个特定的主题topic发布消息,0个或多个订阅者接收来自这个主题的消息。这种情模型下,发布者和订阅者闭此不知道对方。实际操作过程中,必须先订阅,再发送消息,而后接收订阅的消息,这个顺序必须保证。

消息队列全面了解

 

 

5、消息的顺序性保证

基于queue消息模型,利用fifo先进先出的特性,可以保证消息的顺序性。

6、消息的ack确认机制

即消息的acknowledge确认机制

为了确保消息不丢失,消息队列提供了消息acknowledge机制,即ack机制,当consumer确认消息已经被消费者处理,发送一个ack给消息队列,此时消息队列便可以删除这个 消息了。如果consumer宕机/关闭,没有发送ack,消息队列讲人为这个消息没有被处理,会将这个消息发送给其他的consumer重新消息处理。

7、消息的持久化

消息的持久化,对于一些关键的核心业务来说是非常重要的,启用消息持久化后,消息队列宕机重启后,消息而已从持久化存储恢复,消息不丢失,可以继续消费处理。

8、消息的同部和异步收发

同部:消息的收发支持同步收发的方式。

同时还有另一种同步方式:同步收发场景下,消息生产者和消费者双向应答模式,例如:张三写封信松道邮局中转站,然后李四从中转站获得信,然后在写一份回执信,放到中转站,然后张三去取,当然张三写信的时候就得写明回信地址。

消息的接收如果以同步的方式(pull)进行接收,如哦队列中为空,此时接收处于同步阻塞状态。会一直等待,直到消息的到达。

异步:消费的收发同样支持异步方式:异步发送 消息,不需要等待消息队列的接收确认;异步接收消息,以push的方式触发消息消费者接收消息。

9、消息的事务支持

消息的收发处理支持事务,例如:在任务中心场景中,一次处理可能处理涉及多个消息的接收、处理,这处于同一个事务范围内,如果一个消息处理失败,事务回滚,消息重新回到队列中。

三、我们对消息队列的实际使用

我在实际的项目中,使用过两种消息队列组件:

rabbitmq:高可用、高可靠消息应用场景,例如记账失败重试、通知服务,消息不允许丢

kafka:高性能消息应用场景,例如日志、监控、消息允许丢失。

在此之上,我们封装了消息应用中心,日志服务等核心组件和服务,那么,消息应用中心和日志都用到了消息队列什么技术?干活来了……

1、消息应用中心

消息应用中心(任务中心)使用了消息队列的异步处理、数据同步、重试补偿、系统解耦、流量削峰等特性。其中:消息应用中心(任务中心),支持rabbitmq和kafka两种消息通道,支持在任务元数据层面设置

任务:就是一个包含了任务执行上下文的消息,同时代表了异步处理

任务发送者(itasksender)发送任务:消息的生产者将任务消息发送到消息队列

任务类型:消息队列名称,例如:hakeepacco***queue,充电补偿记账队列

消息队列:任务的临时存储

任务中心:任务计中处理,消息消费者

任务处理完成:消息ack确认

任务的多级重试:多个重试消息队列,hasystaskstore2queue

2、日志组件

日志组件,使用了消息队列的高并发缓冲和发布订阅特性。其中:日志组件使用kafka作为消息通道,因为kafka的性能号,吞吐量大,可以容忍偶尔的消息数据丢失,日志组件使用发布订阅的消息模型,日志组件包含日志服务sdk和日志hsf服务,二者都是消息的生产者producer,日志类型:消息的topic主题。日志处理器:消息的消费者、topic的订阅、日志数据处理(hbase\es\其他)

3、rpc服务状态变化通知

rpc服务状态变化通知,使用了消息队列的发布订阅特性,其中:rpc服务状态变化通知,使用robbitmq消息队列技术,使用发布订阅的消息模型,topic:rpcservicestate。rpcservice.proxy:rpc服务状态变化消息的订阅者。rpc服务注册、发布:消息的生产者,发送rpc服务状态变化消息。

四、消息队列使用的最佳实践

1、rabbitmq的链接,底层都是socket链接,长连接 or 短链接?

rabbitmq在创建每个链接的同时,会自动创建一个监控线程来定时(默认60s)侦测链接的状态,如果链接断开,触发connectionshutdown事件。

用长连接,还是用短连接?

发送端:建议使用短连接,用完即释放,避免长连接带来的端口占用,因为发送端无处不在,发送操作短而急促。

接收端:建议使用长连接,时刻接收处理消息,因为消息的接收消费比较集中,接收操作久而弥坚。

2、网络是有抖动的,连接的断开是正常的,如何应对?

发送端:发送失败重试

接收端:注册connectionshutdown事件同时捕获消息接收异常,重新建立连接,接收消费消息。

3、rabbitmq exchange(topic)模式下带来的消息队列数量激增

只是创建了一个exchange(topic),为什么会增加这么多queue。以为,每个topic的订阅都是绑定一个queue用作消息的消费。

消息队列全面了解

4、需求的演变,消息结构的改变,如何平滑过渡?

消息是byte[]数组,我们将复杂对象消息二进制序列化。接收到消息后,我们将二进制数组反序列化为实体类。当我们的实体类消息体的结构发生变化后,因为受到二进制学历恶化处理的影响,导致无法反序列化。

解决方案:

消息体预留一些string类型的扩展字段。

消息队列版本化,支持多个版本的消息体。

5、kafka conusmer group

同一个topic的一条消息只能被同一个group内的一个consumer消费,多个consumer group可同时消费同一个消息。

消息队列全面了解

 6、消息的挤压

消息的挤压产生的原因:消息接收消费的速率低,发送的速度>接收的速度。

消息积压后得影响:

消息大量积压后,当新得消费者连接上mq并开始接收消息时,发送速率会大幅降低。消息队列集群得压力增加,大量得消息要持久化存储和同步。

如何减少消息挤压:快速消费消息,同时保持消息体不要过大。

这次的mq相关知识先分享到这里。

转自:https://www.cnblogs.com/xiaomowang/p/12729982.html