欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

rabbitmq启动命令(rabbitmq消息队列5种模式)

程序员文章站 2023-11-17 23:11:46
文章目录:简介rabbitmq是一个由erlang开发的消息队列。消息队列用于应用间的异步协作。基本概念message:由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属...

文章目录:

rabbitmq启动命令(rabbitmq消息队列5种模式)

简介

rabbitmq是一个由erlang开发的消息队列。消息队列用于应用间的异步协作。

rabbitmq启动命令(rabbitmq消息队列5种模式)

基本概念

message:由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key、priority、delivery-mode(是否持久性存储)等。

publisher:消息的生产者。

exchange:接收消息并将消息路由到一个或多个queue。default exchange 是默认的直连交换机,名字为空字符串,每个新建队列都会自动绑定到默认交换机上,绑定的路由键名称与队列名称相同。

binding:通过binding将exchange和queue关联,这样exchange就知道将消息路由到哪个queue中。

queue:存储消息,队列的特性是先进先出。一个消息可分发到一个或多个队列。

virtual host:每个 vhost 本质上就是一个 mini 版的 rabbitmq 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 amqp 概念的基础,必须在连接时指定,rabbitmq 默认的 vhost 是 / 。当多个不同的用户使用同一个rabbitmq server提供的服务时,可以划分出多个vhost,每个用户在自己的vhost创建exchange和queue。

broker:消息队列服务器实体。

什么时候使用mq

对于一些不需要立即生效的操作,可以拆分出来,异步执行,使用消息队列实现。

以常见的订单系统为例,用户点击下单按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发短信通知。这种场景下就可以用 mq 。将短信通知放到 mq 异步执行,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 mq, 让主流程快速完结,而由另外的线程消费mq的消息。

优缺点

缺点:使用erlang实现,不利于二次开发和维护;性能较kafka差,持久化消息和ack确认的情况下生产和消费消息单机吞吐量大约在1-2万左右,kafka单机吞吐量在十万级别。

优点:有管理界面,方便使用;可靠性高;功能丰富,支持消息持久化、消息确认机制、多种消息分发机制。

exchange 类型

exchange分发消息时根据类型的不同分发策略不同,目前共四种类型:direct、fanout、topic、headers 。headers 模式根据消息的headers进行路由,此外 headers 交换器和 direct 交换器完全一致,但性能差很多。

exchange规则。

类型名称 类型描述 fanout 把所有发送到该exchange的消息路由到所有与它绑定的queue中 direct routing key==binding key topic 模糊匹配 headers exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的header属性进行匹配。

direct

direct交换机会将消息路由到binding key 和 routing key完全匹配的队列中。它是完全匹配、单播的模式。

rabbitmq启动命令(rabbitmq消息队列5种模式)


fanout

所有发到 fanout 类型交换机的消息都会路由到所有与该交换机绑定的队列上去。fanout 类型转发消息是最快的。

rabbitmq启动命令(rabbitmq消息队列5种模式)

topic

topic交换机使用routing key和binding key进行模糊匹配,匹配成功则将消息发送到相应的队列。routing key和binding key都是句点号“. ”分隔的字符串,binding key中可以存在两种特殊字符“*”与“#”,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词。

rabbitmq启动命令(rabbitmq消息队列5种模式)

headers

headers交换机是根据发送的消息内容中的headers属性进行路由的。在绑定queue与exchange时指定一组键值对;当消息发送到exchange时,rabbitmq会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配queue与exchange绑定时指定的键值对;如果完全匹配则消息会路由到该queue,否则不会路由到该queue。

消息丢失

消息丢失场景:生产者生产消息到rabbitmq server消息丢失、rabbitmq server存储的消息丢失和rabbitmq server到消费者消息丢失。

消息丢失从三个方面来解决:生产者确认机制、消费者手动确认消息和持久化。

生产者确认机制

生产者发送消息到队列,无法确保发送的消息成功的到达server。

解决方法:

  1. 事务机制。在一条消息发送之后会使发送端阻塞,等待rabbitmq的回应,之后才能继续发送下一条消息。性能差。
  2. 开启生产者确认机制,只要消息成功发送到交换机之后,rabbitmq就会发送一个ack给生产者(即使消息没有queue接收,也会发送ack)。如果消息没有成功发送到交换机,就会发送一条nack消息,提示发送失败。

在 springboot 是通过 publisher-confirms 参数来设置 confirm 模式:

spring:
    rabbitmq:   
        #开启 confirm 确认机制
        publisher-confirms: true

在生产端提供一个回调方法,当服务端确认了一条或者多条消息后,生产者会回调这个方法,根据具体的结果对消息进行后续处理,比如重新发送、记录日志等。

// 消息是否成功发送到exchange
final rabbittemplate.confirmcallback confirmcallback = (correlationdata correlationdata, boolean ack, string cause) -> {
            log.info("correlationdata: " + correlationdata);
            log.info("ack: " + ack);
            if(!ack) {
                log.info("异常处理....");
            }
    };

rabbittemplate.setconfirmcallback(confirmcallback);

路由不可达消息

生产者确认机制只确保消息正确到达交换机,对于从交换机路由到queue失败的消息,会被丢弃掉,导致消息丢失。

对于不可路由的消息,有两种处理方式:return消息机制和备份交换机。

return消息机制

return消息机制提供了回调函数 returncallback,当消息从交换机路由到queue失败才会回调这个方法。需要将mandatory 设置为 true ,才能监听到路由不可达的消息。

spring:
    rabbitmq:
        #触发returncallback必须设置mandatory=true, 否则exchange没有找到queue就会丢弃掉消息, 而不会触发returncallback
        template.mandatory: true

通过 returncallback 监听路由不可达消息。

    final rabbittemplate.returncallback returncallback = (message message, int replycode, string replytext, string exchange, string routingkey) ->
            log.info("return exchange: " + exchange + ", routingkey: "
                    + routingkey + ", replycode: " + replycode + ", replytext: " + replytext);
rabbittemplate.setreturncallback(returncallback);

当消息从交换机路由到queue失败时,会返回 return exchange: , routingkey: mail, replycode: 312, replytext: no_route。

备份交换机

备份交换机alternate-exchange 是一个普通的exchange,当你发送消息到对应的exchange时,没有匹配到queue,就会自动转移到备份交换机对应的queue,这样消息就不会丢失。

消费者手动消息确认

有可能消费者收到消息还没来得及处理mq服务就宕机了,导致消息丢失。因为消息者默认采用自动ack,一旦消费者收到消息后会通知mq server这条消息已经处理好了,mq 就会移除这条消息。

解决方法:消费者设置为手动确认消息。消费者处理完逻辑之后再给broker回复ack,表示消息已经成功消费,可以从broker中删除。当消息者消费失败的时候,给broker回复nack,根据配置决定重新入队还是从broker移除,或者进入死信队列。只要没收到消费者的 acknowledgment,broker 就会一直保存着这条消息,但不会 requeue,也不会分配给其他 消费者。

消费者设置手动ack:

#设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual

消息处理完,手动确认:

    @rabbitlistener(queues = rabbitmqconfig.mail_queue)
    public void onmessage(message message, channel channel) throws ioexception {

        try {
            thread.sleep(5000);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
        long deliverytag = message.getmessageproperties().getdeliverytag();
        //手工ack;第二个参数是multiple,设置为true,表示deliverytag序列号之前(包括自身)的消息都已经收到,设为false则表示收到一条消息
        channel.basicack(deliverytag, true);
        system.out.println("mail listener receive: " + new string(message.getbody()));
    }

当消息消费失败时,消费端给broker回复nack,如果consumer设置了requeue为false,则nack后broker会删除消息或者进入死信队列,否则消息会重新入队。

持久化

如果rabbitmq服务异常导致重启,将会导致消息丢失。rabbitmq提供了持久化的机制,将内存中的消息持久化到硬盘上,即使重启rabbitmq,消息也不会丢失。

消息持久化需要满足以下条件:

  1. 消息设置持久化。发布消息前,设置投递模式delivery mode为2,表示消息需要持久化。
  2. queue设置持久化。
  3. 交换机设置持久化。

当发布一条消息到交换机上时,rabbit会先把消息写入持久化日志,然后才向生产者发送响应。一旦从队列中消费了一条消息的话并且做了确认,rabbitmq会在持久化日志中移除这条消息。在消费消息前,如果rabbitmq重启的话,服务器会自动重建交换机和队列,加载持久化日志中的消息到相应的队列或者交换机上,保证消息不会丢失。

镜像队列

当mq发生故障时,会导致服务不可用。引入rabbitmq的镜像队列机制,将queue镜像到集群中其他的节点之上。如果集群中的一个节点失效了,能自动地切换到镜像中的另一个节点以保证服务的可用性。

通常每一个镜像队列都包含一个master和多个slave,分别对应于不同的节点。发送到镜像队列的所有消息总是被直接发送到master和所有的slave之上。除了publish外所有动作都只会向master发送,然后由master将命令执行的结果广播给slave,从镜像队列中的消费操作实际上是在master上执行的。

重复消费

消息重复的原因有两个:1.生产时消息重复,2.消费时消息重复。

生产者发送消息给mq,在mq确认的时候出现了网络波动,生产者没有收到确认,这时候生产者就会重新发送这条消息,导致mq会接收到重复消息。

消费者消费成功后,给mq确认的时候出现了网络波动,mq没有接收到确认,为了保证消息不丢失,mq就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。由于重复消息是由于网络原因造成的,无法避免。

解决方法:发送消息时让每个消息携带一个全局的唯一id,在消费消息时先判断消息是否已经被消费过,保证消息消费逻辑的幂等性。具体消费过程为:

  1. 消费者获取到消息后先根据id去查询redis/db是否存在该消息
  2. 如果不存在,则正常消费,消费完毕后写入redis/db
  3. 如果存在,则证明消息被消费过,直接丢弃

消费端限流

当 rabbitmq 服务器积压大量消息时,队列里的消息会大量涌入消费端,可能导致消费端服务器奔溃。这种情况下需要对消费端限流。

spring rabbitmq 提供参数 prefetch 可以设置单个请求处理的消息个数。如果消费者同时处理的消息到达最大值的时候,则该消费者会阻塞,不会消费新的消息,直到有消息 ack 才会消费新的消息。

开启消费端限流:

#在单个请求中处理的消息个数,unack的最大数量
spring.rabbitmq.listener.simple.prefetch=2

原生 rabbitmq 还提供 prefetchsize 和 global 两个参数。spring rabbitmq没有这两个参数。

//单条消息大小限制,0代表不限制
//global:限制限流功能是channel级别的还是consumer级别。当设置为false,consumer级别,限流功能生效,设置为true没有了限流功能,因为channel级别尚未实现。
void basicqos(int prefetchsize, int prefetchcount, boolean global) throws ioexception;

死信队列

消费失败的消息存放的队列。

消息消费失败的原因:

  • 消息被拒绝并且消息没有重新入队(requeue=false)
  • 消息超时未消费
  • 达到最大队列长度

设置死信队列的 exchange 和 queue,然后进行绑定:

 @bean
    public directexchange dlxexchange() {
        return new directexchange(rabbitmqconfig.dlx_exchange);
    }

    @bean
    public queue dlxqueue() {
        return new queue(rabbitmqconfig.dlx_queue, true);
    }

    @bean
    public binding bindingdeadexchange(queue dlxqueue, directexchange deadexchange) {
        return bindingbuilder.bind(dlxqueue).to(deadexchange).with(rabbitmqconfig.dlx_queue);
    }

在普通队列加上两个参数,绑定普通队列到死信队列。当消息消费失败时,消息会被路由到死信队列。

    @bean
    public queue sendsmsqueue() {
        map<string,object> arguments = new hashmap<>(2);
        // 绑定该队列到私信交换机
        arguments.put("x-dead-letter-exchange", rabbitmqconfig.dlx_exchange);
        arguments.put("x-dead-letter-routing-key", rabbitmqconfig.dlx_queue);
        return new queue(rabbitmqconfig.mail_queue, true, false, false, arguments);
    }

生产者完整代码:

@component
@slf4j
public class mqproducer {

    @autowired
    rabbittemplate rabbittemplate;

    @autowired
    randomutil randomutil;

    @autowired
    userservice userservice;

    final rabbittemplate.confirmcallback confirmcallback = (correlationdata correlationdata, boolean ack, string cause) -> {
            log.info("correlationdata: " + correlationdata);
            log.info("ack: " + ack);
            if(!ack) {
                log.info("异常处理....");
            }
    };


    final rabbittemplate.returncallback returncallback = (message message, int replycode, string replytext, string exchange, string routingkey) ->
            log.info("return exchange: " + exchange + ", routingkey: "
                    + routingkey + ", replycode: " + replycode + ", replytext: " + replytext);

    public void sendmail(string mail) {
        //貌似线程不安全 范围100000 - 999999
        integer random = randomutil.nextint(100000, 999999);
        map<string, string> map = new hashmap<>(2);
        string code = random.tostring();
        map.put("mail", mail);
        map.put("code", code);

        messageproperties mp = new messageproperties();
        //在生产环境中这里不用message,而是使用 fastjson 等工具将对象转换为 json 格式发送
        message msg = new message("tyson".getbytes(), mp);
        msg.getmessageproperties().setexpiration("3000");
        //如果消费端要设置为手工 ack ,那么生产端发送消息的时候一定发送 correlationdata ,并且全局唯一,用以唯一标识消息。
        correlationdata correlationdata = new correlationdata("1234567890"+new date());

        rabbittemplate.setmandatory(true);
        rabbittemplate.setconfirmcallback(confirmcallback);
        rabbittemplate.setreturncallback(returncallback);
        rabbittemplate.convertandsend(rabbitmqconfig.mail_queue, msg, correlationdata);

        //存入redis
        userservice.updatemailsendstate(mail, code, mailconfig.mail_state_wait);
    }
}

消费者完整代码:

@slf4j
@component
public class deadlistener {

    @rabbitlistener(queues = rabbitmqconfig.dlx_queue)
    public void onmessage(message message, channel channel) throws ioexception {

        try {
            thread.sleep(5000);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }
        long deliverytag = message.getmessageproperties().getdeliverytag();
        //手工ack
        channel.basicack(deliverytag,false);
        system.out.println("receive--1: " + new string(message.getbody()));
    }
}

当普通队列中有死信时,rabbitmq 就会自动的将这个消息重新发布到设置的死信交换机去,然后被路由到死信队列。可以监听死信队列中的消息做相应的处理。

其他

pull模式

pull模式主要是通过channel.basicget方法来获取消息,示例代码如下:

getresponse response = channel.basicget(queue_name, false);
system.out.println(new string(response.getbody()));
channel.basicack(response.getenvelope().getdeliverytag(),false);

消息过期时间

在生产端发送消息的时候可以给消息设置过期时间,单位为毫秒(ms)

message msg = new message("tyson".getbytes(), mp);
msg.getmessageproperties().setexpiration("3000");

也可以在创建队列的时候指定队列的ttl,从消息入队列开始计算,超过该时间的消息将会被移除。

参考链接

rabbitmq基础

springboot整合rabbitmq

rabbitmq之消息持久化

rabbitmq发送邮件代码

线上rabbitmq问题

最后给大家分享一个github仓库,上面放了200多本经典的计算机书籍,包括c语言、c++、java、python、前端、数据库、操作系统、计算机网络、数据结构和算法、机器学习、编程人生等,可以star一下,下次找书直接在上面搜索,