消息队列
消息队列相关笔记
消息队列的应用场景:
- 消费者执行过程比较长且生产者不需要消费者返回结果。用于更新索引库,生成商品详情页,发短信。
为什么要使用消息队列:
- 通过异步处理提高系统性能(削峰、减少响应所需时间);
- 降低系统耦合性。
- 削锋作用:通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。
消息队列带来的问题:
- 系统可用性降低:系统引入的外部依赖越多,系统越容易出问题。如果MQ出问题,整套系统更容易崩溃。
- 系统复杂性提高:加入消息队列后,需要保证消息没有被重复消费,保证消息传递的顺序性等等。
- 一致性问题:消息没有被真正消费者正确消费,会导致数据不一致的情况。
JMS:Java Message Service,是一个消息服务的标准和规范,允许应用程序组件基于JaveEE平台创建、发送、接收和读取消息。
JMS的两种消息模型:
- 点到点模型(P2P):
使用队列作为消息通信载体,一条消息只能被一个消费者使用,未消费的消息在队列中保留直到被消费或者超时。
- 发布/订阅模型(Pub/Sub):
发布者发布一条信息,通过主题传递给所有的订阅者,在一条消息广播后才订阅的用户收不到该条消息。
JMS的五种消息正文格式:
- StreamMessage——Java原始值的数据流
- MapMessage——一套键值对
- TextMessage——一个字符串对象
- ObjectMessage——一个序列化的Java对象
- BytesMessage——一个字节的数据流
- AMQP:Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议(二进制应用层协议)。
如何保证消息队列的高可用?
以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。
RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。
- 单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式。
- 普通集群模式,无高可用性,在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。这个方案提高了吞吐量,但没有高可用性。
-
镜像集群模式,有高可用性,queue里的消息会存在于多个实例上,每个RabbitMQ节点都有这个queue的一个完整镜像,包含queue的全部数据。该模式的缺点:性能开销过大,带宽压力和消耗很重;不是分布式的,没有扩展性。
Kafka的高可用性:由多个broker组成,每个broker是一个节点;你创建的一个topic会被划分为多个partition,每个partition存储于不同的broker上,每个partition包含部分topic的数据,是一个天然的分布式消息队列。
Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。
每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。
所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。
这么搞,就有所谓的高可用性了,因为如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都有副本的,如果这上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。
如何保证消息队列的幂等性?
- 产生重复消费的可能原因:消息读写过程中,遇到意外kill进程或重启,导致consumer有些消息处理了,但没来得及提交offset,重启后少数消息会再被消费一次。(offset代表消息的序号,消费过的offset会被提交)
-
保证MQ的消费幂等性,需要结合具体业务来看,大体思路有:
- Redis的写入没有幂等性问题,每次都是set,天然幂等性。
- 主键或者唯一的id,可以在消费前进行查询一下,是否消费过。
- 数据库的唯一键可以保证重复数据不会插入多条,重复数据插入时会报错。
如何处理消息丢失的问题?
-
生产者弄丢了数据:
生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务
channel.txSelect
,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback
,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit
。// 开启事务 channel.txSelect try { // 这里发送消息 } catch (Exception e) { channel.txRollback // 这里再次重发这条消息 } // 提交事务 channel.txCommit
但是问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能。
所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启
confirm
模式,在生产者那里设置开启confirm
模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个ack
消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个nack
接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。事务机制和
cnofirm
机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm
机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用confirm
机制的。 -
RabbitMQ弄丢了数据:
必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
设置持久化有两个步骤:
- 创建 queue 的时候将其设置为持久化
这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。 - 第二个是发送消息的时候将消息的
deliveryMode
设置为 2
就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
- 创建 queue 的时候将其设置为持久化
- 消费端弄丢了数据
这个时候得用 RabbitMQ 提供的
ack
机制,简单来说,就是你必须关闭 RabbitMQ 的自动ack
,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里ack
一把。这样的话,如果你还没处理完,不就没有ack
了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。ack消息延迟:
如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个
ack
消息,告诉你说这个消息 ok 了。→持久化可以跟生产者那边的
confirm
机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack
了。→关闭 RabbitMQ 的自动
ack
,可以通过一个 api 来调用就行,然后每次确保消费端处理完数据的时候,再在程序里ack
一把。
-
各类MQ对比
特性 ActiveMQ RabbitMQ RocketMQ Kafka 单击吞吐量 万 级 万级 10万级 10万级 topic数量对吞吐量的影响 topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 topic从几十个到几百个的时候,吞吐量会大幅度下降 时效性 ms级 微秒级,延迟最低 ms级 ms级 可用性 高 高 非常高,分布式架构 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 消息可靠性 低概率丢失数据 经过参数优化配置,可以做到0丢失 经过参数优化配置,可以做到0丢失 功能支持 功能极其完备 基于erlang开发,所以并发能力很强,性能极其好,延时很低 MQ功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 -
代码
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency>
//生产者 @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination queueSolrDestination; @Autowired private Destination topicPageDestination; //更新索引库 jmsTemplate.send(queueSolrDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(jsonString); } }); //生成商品详情页 jmsTemplate.send(topicPageDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(id + ""); } });
//消费者(监听) @Component public class PageListener implements MessageListener{ @Autowired private ItemPageService itemPageService; @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println("接收到消息:" + text); boolean b = itemPageService.genItemHtml(Long.parseLong(text)); System.out.println("网页生成结果:" + b); } catch (JMSException e) { e.printStackTrace(); } } }
上一篇: 路灯算法