欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

消息队列

程序员文章站 2022-05-18 10:46:32
...

消息队列相关笔记

消息队列的应用场景:

  • 消费者执行过程比较长且生产者不需要消费者返回结果。用于更新索引库,生成商品详情页,发短信。

为什么要使用消息队列:

  1. 通过异步处理提高系统性能(削峰、减少响应所需时间);
  2. 降低系统耦合性。
  • 削锋作用:通过异步处理,将短时间高并发产生的事务消息存储在消息队列中,从而削平高峰期的并发事务。

消息队列带来的问题:

  1. 系统可用性降低:系统引入的外部依赖越多,系统越容易出问题。如果MQ出问题,整套系统更容易崩溃。
  2. 系统复杂性提高:加入消息队列后,需要保证消息没有被重复消费,保证消息传递的顺序性等等。
  3. 一致性问题:消息没有被真正消费者正确消费,会导致数据不一致的情况。

JMS:Java Message Service,是一个消息服务的标准和规范,允许应用程序组件基于JaveEE平台创建、发送、接收和读取消息。

JMS的两种消息模型:

  1. 点到点模型(P2P):

    消息队列

    使用队列作为消息通信载体,一条消息只能被一个消费者使用,未消费的消息在队列中保留直到被消费或者超时。

  2. 发布/订阅模型(Pub/Sub):

    消息队列

    发布者发布一条信息,通过主题传递给所有的订阅者,在一条消息广播后才订阅的用户收不到该条消息。

JMS的五种消息正文格式:

  1. StreamMessage——Java原始值的数据流
  2. MapMessage——一套键值对
  3. TextMessage——一个字符串对象
  4. ObjectMessage——一个序列化的Java对象
  5. BytesMessage——一个字节的数据流
  • AMQP:Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议(二进制应用层协议)。

如何保证消息队列的高可用?

以 RabbitMQ 为例子讲解第一种 MQ 的高可用性怎么实现。

RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

  1. 单机模式,就是 Demo 级别的,一般就是你本地启动了玩玩儿的,没人生产用单机模式。
  2. 普通集群模式,无高可用性,在多台机器上启动多个 RabbitMQ 实例,每个机器启动一个。你创建的 queue,只会放在一个 RabbitMQ 实例上,没做到所谓的分布式,就是个普通集群。因为这导致你要么消费者每次随机连接一个实例然后拉取数据,要么固定连接那个 queue 所在实例消费数据,前者有数据拉取的开销,后者导致单实例性能瓶颈。这个方案提高了吞吐量,但没有高可用性。
  3. 镜像集群模式,有高可用性,queue里的消息会存在于多个实例上,每个RabbitMQ节点都有这个queue的一个完整镜像,包含queue的全部数据。该模式的缺点:性能开销过大,带宽压力和消耗很重;不是分布式的,没有扩展性。

    Kafka的高可用性:由多个broker组成,每个broker是一个节点;你创建的一个topic会被划分为多个partition,每个partition存储于不同的broker上,每个partition包含部分topic的数据,是一个天然的分布式消息队列。

    Kafka 0.8 以后,提供了 HA 机制,就是 replica(复制品) 副本机制。

    每个 partition 的数据都会同步到其它机器上,形成自己的多个 replica 副本。

    所有 replica 会选举一个 leader 出来,那么生产和消费都跟这个 leader 打交道,然后其他 replica 就是 follower。

    这么搞,就有所谓的高可用性了,因为如果某个 broker 宕机了,没事儿,那个 broker上面的 partition 在其他机器上都有副本的,如果这上面有某个 partition 的 leader,那么此时会从 follower 中重新选举一个新的 leader 出来,大家继续读写那个新的 leader 即可。

    消息队列

 

如何保证消息队列的幂等性?

  • 产生重复消费的可能原因:消息读写过程中,遇到意外kill进程或重启,导致consumer有些消息处理了,但没来得及提交offset,重启后少数消息会再被消费一次。(offset代表消息的序号,消费过的offset会被提交)
  • 保证MQ的消费幂等性,需要结合具体业务来看,大体思路有:

    1. Redis的写入没有幂等性问题,每次都是set,天然幂等性。
    2. 主键或者唯一的id,可以在消费前进行查询一下,是否消费过。
    3. 数据库的唯一键可以保证重复数据不会插入多条,重复数据插入时会报错。

如何处理消息丢失的问题?

 

  1. 生产者弄丢了数据:

    生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。此时可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit

    // 开启事务
    channel.txSelect
    try {
        // 这里发送消息
    } catch (Exception e) {
        channel.txRollback
        // 这里再次重发这条消息
    }
    // 提交事务
    channel.txCommit

    但是问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能

    所以一般来说,如果你要确保说写 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

    事务机制和 cnofirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

  2. RabbitMQ弄丢了数据:

    必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

    设置持久化有两个步骤

    • 创建 queue 的时候将其设置为持久化
      这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
    • 第二个是发送消息的时候将消息的 deliveryMode 设置为 2
      就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
  3. 消费端弄丢了数据

    这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

    消息队列

    ack消息延迟:

    如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。→

    持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了。→

    关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次确保消费端处理完数据的时候,再在程序里 ack 一把。

  • 各类MQ对比

    特性 ActiveMQ RabbitMQ RocketMQ Kafka
    单击吞吐量 万 级 万级 10万级 10万级
    topic数量对吞吐量的影响     topic可以达到几百,几千个的级别,吞吐量会有较小幅度的下降 topic从几十个到几百个的时候,吞吐量会大幅度下降
    时效性 ms级 微秒级,延迟最低 ms级 ms级
    可用性 非常高,分布式架构 非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
    消息可靠性 低概率丢失数据   经过参数优化配置,可以做到0丢失 经过参数优化配置,可以做到0丢失
    功能支持 功能极其完备 基于erlang开发,所以并发能力很强,性能极其好,延时很低 MQ功能较为完善,还是分布式的,扩展性好 功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准
  • 代码

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
    </dependency>
    //生产者
    @Autowired
    private JmsTemplate jmsTemplate;
    @Autowired
    private Destination queueSolrDestination;
    @Autowired
    private Destination topicPageDestination;
    //更新索引库
    jmsTemplate.send(queueSolrDestination, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage(jsonString);
        }
    });
    //生成商品详情页
    jmsTemplate.send(topicPageDestination, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            return session.createTextMessage(id + "");
        }
    });
    //消费者(监听)
    @Component
    public class PageListener implements MessageListener{
        @Autowired
        private ItemPageService itemPageService;
        @Override
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                String text = textMessage.getText();
                System.out.println("接收到消息:" + text);
                boolean b = itemPageService.genItemHtml(Long.parseLong(text));
                System.out.println("网页生成结果:" + b);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

相关标签: Java